Skip to content

Commit 994d8ba

Browse files
committed
fix(filters): fix ExplodeFilter should support dotted notation
1 parent 2877c33 commit 994d8ba

File tree

5 files changed

+98
-16
lines changed

5 files changed

+98
-16
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedStruct.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Arrays;
2222
import java.util.Iterator;
2323
import java.util.LinkedList;
24-
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.Objects;
@@ -502,8 +501,14 @@ public int hashCode() {
502501
@Override
503502
public String toString() {
504503
return StreamSupport.stream(schema.spliterator(), false)
505-
.map( field -> field.name() + "=" + get(field))
506-
.collect(Collectors.joining(",", "[", "]"));
504+
.map( field -> {
505+
TypedValue value = get(field);
506+
return
507+
"name: " + field.name()
508+
+ ", type: " + value.schema().type()
509+
+ ", value: " + value.value();
510+
})
511+
.collect(Collectors.joining(", ", "[", "]"));
507512
}
508513

509514
/**

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/merger/DefaultTypeValueMerger.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828

2929
import java.util.LinkedList;
3030
import java.util.List;
31+
import java.util.Objects;
3132
import java.util.Set;
33+
import java.util.stream.Collectors;
3234

3335
/**
3436
* Default {@link TypeValueMerger} implementation which automatically
@@ -42,37 +44,54 @@ public class DefaultTypeValueMerger implements TypeValueMerger {
4244
* @param left the left {@link TypedStruct} to be merged.
4345
* @param right the right {@link TypedStruct} to be merged.
4446
* @param overwrite the left field that must overwritten.
45-
*
46-
* @return the new {@link TypedStruct} instance.
47+
* @return the new {@link TypedStruct} instance.
4748
*/
4849
public TypedStruct merge(final TypedStruct left,
4950
final TypedStruct right,
5051
final Set<String> overwrite) {
52+
return mergeObjects(left, right, overwrite.stream().map(Path::new).collect(Collectors.toSet()));
53+
}
5154

55+
private static TypedStruct mergeObjects(final TypedStruct left,
56+
final TypedStruct right,
57+
final Set<Path> overwrite) {
5258
if (left == null) return right;
5359
if (right == null) return left;
5460

5561
final TypedStruct struct = TypedStruct.create();
5662

5763
for (TypedField leftField : left) {
58-
final String leftFieldName = leftField.name();
59-
60-
boolean isOverwrite = overwrite.contains(leftFieldName);
64+
final String fieldName = leftField.name();
6165

62-
TypedValue typedValue = left.get(leftField);
66+
final TypedValue leftValue = left.get(leftField);
6367

64-
if (!right.has(leftFieldName)) {
65-
struct.put(leftFieldName, typedValue);
68+
if (!right.has(fieldName)) {
69+
struct.put(fieldName, leftValue);
6670
continue;
6771
}
6872

73+
boolean isOverwrite = overwrite.stream().anyMatch(it -> it.matches(fieldName));
74+
6975
if (isOverwrite) {
7076
continue; // skip the left field
7177
}
7278

73-
final TypedField rightField = right.field(leftFieldName);
79+
final TypedField rightField = right.field(fieldName);
7480
checkIfTypesAreCompatibleForMerge(leftField, rightField);
75-
TypedValue merged = merge(typedValue, right.get(rightField));
81+
82+
final TypedValue rightValue = right.get(rightField);
83+
final TypedValue merged;
84+
85+
if (leftField.type() == Type.STRUCT) {
86+
final TypedStruct mergedStruct = mergeObjects(
87+
leftValue.getStruct(),
88+
rightValue.getStruct(),
89+
overwrite.stream().map(Path::forward).filter(Objects::nonNull).collect(Collectors.toSet())
90+
);
91+
merged = TypedValue.struct(mergedStruct);
92+
} else {
93+
merged = merge(leftValue, rightValue);
94+
}
7695
struct.put(leftField.name(), merged);
7796
}
7897

@@ -84,6 +103,33 @@ public TypedStruct merge(final TypedStruct left,
84103
return struct;
85104
}
86105

106+
private static class Path {
107+
108+
private final String path;
109+
private final String field;
110+
private final String remaining;
111+
112+
Path(final String path) {
113+
this.path = path;
114+
if (path.contains(".")) {
115+
String[] split = path.split("\\.", 2);
116+
field = split[0];
117+
remaining = split[1];
118+
} else {
119+
field = path;
120+
remaining = null;
121+
}
122+
}
123+
124+
public boolean matches(final String field) {
125+
return this.field.equals(field) || path.equals(field);
126+
}
127+
128+
private Path forward() {
129+
return remaining == null ? null : new Path(remaining);
130+
}
131+
}
132+
87133
private static TypedValue merge(final TypedValue left, final TypedValue right) {
88134
List<Object> values = new LinkedList<>();
89135

@@ -170,4 +216,5 @@ private static void checkIfTypesAreCompatibleForMerge(final TypedField left, fin
170216
private static boolean isTypeNotEqual(final Schema left, final Schema right) {
171217
return left != null && right != null && left.type() != right.type();
172218
}
219+
173220
}

connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/data/merger/TypeValueMergerTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,20 @@ public void shouldMergeStructGivenTwoArrayFieldsWithEqualsValueType() {
132132
assertEquals(VALUE_A, merged.getArray(FIELD_VALUE_A).get(0));
133133
assertEquals(VALUE_B, merged.getArray(FIELD_VALUE_A).get(1));
134134
}
135+
136+
@Test
137+
public void shouldMergedGivenTwoStructWithCommonChildFields() {
138+
final TypedStruct structLeft = TypedStruct.create()
139+
.insert("a.b1.c", VALUE_A);
140+
141+
final TypedStruct structRight = TypedStruct.create()
142+
.insert("a.b2.c", VALUE_B);
143+
144+
final TypedStruct merged = merger.merge(structLeft, structRight, Collections.emptySet());
145+
146+
assertNotNull(merged);
147+
148+
assertEquals(VALUE_A, merged.find("a.b1.c").getString());
149+
assertEquals(VALUE_B, merged.find("a.b2.c").getString());
150+
}
135151
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/ExplodeFilter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,16 @@ public ConfigDef configDef() {
6363
@Override
6464
protected RecordsIterable<TypedStruct> apply(final FilterContext context,
6565
final TypedStruct record) throws FilterException {
66-
TypedValue value = checkIsNotNull(record.get(config.source()));
66+
final TypedValue value = checkIsNotNull(record.find(config.source()));
67+
6768
if (value.type() != Type.ARRAY) {
6869
throw new FilterException(
69-
"Invalid type for field '" + config.source() + "', expected ARRAY, was " + value.type());
70+
"Invalid type for field '" + config.source() + "', expected ARRAY, was " + value.type());
7071
}
7172

7273
final List<TypedStruct> explode = value.getArray()
7374
.stream()
74-
.map(it -> TypedStruct.create().put(config.source(), TypedValue.any(it)))
75+
.map(it -> TypedStruct.create().insert(config.source(), TypedValue.any(it)))
7576
.collect(Collectors.toList());
7677

7778
return new RecordsIterable<>(explode);

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/ExplodeFilterTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ public void should_explode_input_given_sub_field_source() {
8080
Assert.assertEquals(PRIMITIVE_VALUES, extractStringValues(result, it -> it.getString("values")));
8181
}
8282

83+
@Test
84+
public void should_explode_input_given_sub_dotted_field_source() {
85+
filter.configure(new HashMap<String, String>() {{
86+
put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "field.values");
87+
}});
88+
89+
RecordsIterable<TypedStruct> result = filter.apply(null, create()
90+
.insert("message", "dummy")
91+
.insert("field.values", PRIMITIVE_VALUES), false);
92+
Assert.assertEquals(PRIMITIVE_VALUES.size(), result.size());
93+
Assert.assertEquals(PRIMITIVE_VALUES, extractStringValues(result, it -> it.find("field.values").getString()));
94+
}
95+
8396
private <T> List<T> extractStringValues(final RecordsIterable<TypedStruct> result,
8497
final Function<TypedStruct, T> extractor) {
8598
return result.stream().map(extractor).collect(Collectors.toList());

0 commit comments

Comments
 (0)