Skip to content

Commit cbf6a20

Browse files
committed
fix(filters): Fix AppendFilter should support substitution expression for field config
1 parent 4db95c3 commit cbf6a20

File tree

2 files changed

+56
-13
lines changed

2 files changed

+56
-13
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AppendFilter.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2121
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
2222
import io.streamthoughts.kafka.connect.filepulse.expression.StandardEvaluationContext;
23+
import io.streamthoughts.kafka.connect.filepulse.expression.ValueExpression;
2324
import io.streamthoughts.kafka.connect.filepulse.expression.parser.regex.RegexExpressionParser;
2425
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
2526
import org.apache.kafka.common.config.ConfigDef;
@@ -36,10 +37,12 @@ public class AppendFilter extends AbstractMergeRecordFilter<AppendFilter> {
3637
private AppendFilterConfig config;
3738

3839
private List<Expression> values;
39-
private Expression target;
40+
private Expression fieldExpression;
4041

4142
private RegexExpressionParser parser;
4243

44+
private boolean mustEvaluateWriteExpression = true;
45+
4346
/**
4447
* {@inheritDoc}
4548
*/
@@ -50,8 +53,15 @@ public void configure(final Map<String, ?> props) {
5053

5154
parser = new RegexExpressionParser();
5255
// currently, multiple expressions is not supported
53-
values = Collections.singletonList(parser.parseExpression(config.value(), DEFAULT_ROOT_OBJECT, true));
54-
target = parser.parseExpression(config.field());
56+
values = Collections.singletonList(parser.parseExpression(config.value(), DEFAULT_ROOT_OBJECT));
57+
58+
fieldExpression = parser.parseExpression(config.field(), DEFAULT_ROOT_OBJECT);
59+
60+
// Check whether the field expression can be pre-evaluated
61+
if (fieldExpression instanceof ValueExpression) {
62+
fieldExpression = evaluateWriteExpression(new StandardEvaluationContext(new Object()));
63+
mustEvaluateWriteExpression = false;
64+
}
5565
}
5666

5767
/**
@@ -70,34 +80,42 @@ protected RecordsIterable<TypedStruct> apply(final FilterContext context,
7080
final TypedStruct record) throws FilterException {
7181

7282
InternalFilterContext internalContext = (InternalFilterContext) context;
83+
internalContext.setValue(record);
7384

7485
StandardEvaluationContext readEvaluationContext = new StandardEvaluationContext(
7586
internalContext,
7687
internalContext.variables()
7788
);
7889

79-
final String evaluatedTarget = target.readValue(readEvaluationContext, String.class);
80-
final Expression targetExpression = parser.parseExpression(evaluatedTarget, DEFAULT_ROOT_OBJECT, false);
90+
final Expression writeExpression = evaluateWriteExpression(readEvaluationContext);
8191

8292
final TypedStruct target = TypedStruct.struct();
8393
for (final Expression expression : values) {
8494

8595
internalContext.setValue(record);
86-
final Object typed = expression.readValue(readEvaluationContext);
96+
final Object value = expression.readValue(readEvaluationContext);
8797

88-
if (typed != null) {
98+
if (value != null) {
8999
internalContext.setValue(target);
90100

91101
final StandardEvaluationContext writeEvaluationContext = new StandardEvaluationContext(
92102
internalContext,
93103
internalContext.variables()
94104
);
95-
targetExpression.writeValue(typed, writeEvaluationContext);
105+
writeExpression.writeValue(value, writeEvaluationContext);
96106
}
97107
}
98108
return RecordsIterable.of(target);
99109
}
100110

111+
private Expression evaluateWriteExpression(final StandardEvaluationContext evaluationContext) {
112+
if (mustEvaluateWriteExpression) {
113+
final String evaluated = fieldExpression.readValue(evaluationContext, String.class);
114+
return parser.parseExpression(evaluated, DEFAULT_ROOT_OBJECT, false);
115+
}
116+
return fieldExpression;
117+
}
118+
101119
/**
102120
* {@inheritDoc}
103121
*/

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/AppendFilterTest.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131

3232
public class AppendFilterTest {
3333

34+
private final static TypedStruct STRUCT = new TypedStruct().put("values", Arrays.asList("foo", "bar"));
35+
private final static String EXPRESSION = "{{ extract_array(values,0) }}-{{ extract_array(values,1) }}";
36+
3437
private AppendFilter filter;
3538

3639
private FilterContext context;
@@ -48,16 +51,38 @@ public void setUp() {
4851
}
4952

5053
@Test
51-
public void testGivenSubstitutionExpression() {
54+
public void shouldSupportSubstitutionExpressionForValueConfig() {
5255
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "target");
53-
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "{{ extract_array(values,0) }}-{{ extract_array(values,1) }}");
56+
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
57+
filter.configure(configs);
58+
59+
RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
60+
Assert.assertNotNull(output);
61+
TypedStruct result = output.collect().get(0);
62+
Assert.assertEquals("foo-bar", result.getString("target"));
63+
}
64+
65+
@Test
66+
public void shouldSupportPropertyExpressionForFieldConfig() {
67+
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$value.target");
68+
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
5469
filter.configure(configs);
5570

56-
final TypedStruct struct = new TypedStruct();
57-
struct.put("values", Arrays.asList("foo", "bar"));
58-
RecordsIterable<TypedStruct> output = filter.apply(context, struct);
71+
RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
5972
Assert.assertNotNull(output);
6073
TypedStruct result = output.collect().get(0);
6174
Assert.assertEquals("foo-bar", result.getString("target"));
6275
}
76+
77+
@Test
78+
public void shouldSupportSubstitutionExpressionForFieldConfig() {
79+
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "{{ extract_array(values,0) }}");
80+
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
81+
filter.configure(configs);
82+
83+
RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
84+
Assert.assertNotNull(output);
85+
TypedStruct result = output.collect().get(0);
86+
Assert.assertEquals("foo-bar", result.getString("foo"));
87+
}
6388
}

0 commit comments

Comments
 (0)