Skip to content

Commit 63176ca

Browse files
committed
fix(filters): fix regression for configuruing on failure filter (#114)
Resolves: #114
1 parent 2332c79 commit 63176ca

File tree

17 files changed

+203
-118
lines changed

17 files changed

+203
-118
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/RecordFilter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ public interface RecordFilter extends Configurable {
3636
@Override
3737
void configure(final Map<String, ?> configs);
3838

39+
default void configure(final Map<String, ?> configs,
40+
final RecordFilterProvider provider) {
41+
configure(configs);
42+
}
43+
3944
/**
4045
* Configuration specification for this filter.
4146
*
@@ -115,4 +120,11 @@ default RecordFilterPipeline<FileRecord<TypedStruct>> onFailure() {
115120
default boolean ignoreFailure() {
116121
return false;
117122
}
123+
124+
125+
@FunctionalInterface
126+
static public interface RecordFilterProvider {
127+
128+
RecordFilter getRecordForAlias(final String alias);
129+
}
118130
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonFilterConfig.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,12 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.config;
2020

21-
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
22-
import io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline;
23-
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilter;
24-
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
2521
import io.streamthoughts.kafka.connect.filepulse.filter.condition.ExpressionFilterCondition;
2622
import io.streamthoughts.kafka.connect.filepulse.filter.condition.FilterCondition;
27-
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
2823
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
2924
import org.apache.kafka.common.config.AbstractConfig;
3025
import org.apache.kafka.common.config.ConfigDef;
31-
import org.apache.kafka.connect.errors.ConnectException;
3226

33-
import java.util.ArrayList;
3427
import java.util.Collections;
3528
import java.util.List;
3629
import java.util.Map;
@@ -91,25 +84,9 @@ public boolean ignoreFailure() {
9184
return getBoolean(IGNORE_FAILURE_CONFIG);
9285
}
9386

94-
public RecordFilterPipeline<FileRecord<TypedStruct>> onFailure() {
95-
final List<String> filterAliases = getList(ON_FAILURE_CONFIG);
96-
97-
if (filterAliases == null) return null;
98-
99-
final List<RecordFilter> filters = new ArrayList<>(filterAliases.size());
100-
for (String alias : filterAliases) {
101-
final String prefix = "filters." + alias + ".";
102-
try {
103-
final RecordFilter filter = getClass(prefix + "type")
104-
.asSubclass(RecordFilter.class)
105-
.getDeclaredConstructor().newInstance();
106-
filter.configure(originalsWithPrefix(prefix));
107-
filters.add(filter);
108-
} catch (Exception e) {
109-
throw new ConnectException(e);
110-
}
111-
}
112-
return filters.isEmpty() ? null : new DefaultRecordFilterPipeline(filters);
87+
public List<String> onFailure() {
88+
final List<String> aliases = getList(ON_FAILURE_CONFIG);
89+
return aliases == null ? Collections.emptyList() : aliases;
11390
}
11491

11592
public static ConfigDef withOverwrite(final ConfigDef def) {

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractMergeRecordFilter.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,19 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.filter;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2122
import io.streamthoughts.kafka.connect.filepulse.data.merger.DefaultTypeValueMerger;
2223
import io.streamthoughts.kafka.connect.filepulse.data.merger.TypeValueMerger;
23-
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2424
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
2525

2626
import java.util.List;
27-
import java.util.Map;
2827
import java.util.Set;
2928
import java.util.stream.Collectors;
3029

31-
public abstract class AbstractMergeRecordFilter<T extends AbstractRecordFilter> extends AbstractRecordFilter<T> {
30+
public abstract class AbstractMergeRecordFilter<T extends AbstractRecordFilter<T>> extends AbstractRecordFilter<T> {
3231

33-
private TypeValueMerger merger = new DefaultTypeValueMerger();
32+
private final TypeValueMerger merger = new DefaultTypeValueMerger();
3433

35-
/**
36-
* {@inheritDoc}
37-
*/
38-
@Override
39-
public void configure(final Map<String, ?> props) {
40-
super.configure(props);
41-
}
4234

4335
/**
4436
* {@inheritDoc}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractRecordFilter.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.filter;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
2122
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2223
import io.streamthoughts.kafka.connect.filepulse.filter.condition.FilterCondition;
23-
import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
2424
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
2525
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
2626
import org.apache.kafka.common.config.ConfigDef;
2727

2828
import java.util.Map;
29+
import java.util.stream.Collectors;
2930

30-
public abstract class AbstractRecordFilter<T extends AbstractRecordFilter> implements RecordFilter {
31+
public abstract class AbstractRecordFilter<T extends AbstractRecordFilter<T>> implements RecordFilter {
3132

3233
private RecordFilterPipeline<FileRecord<TypedStruct>> failurePipeline;
3334

@@ -45,11 +46,24 @@ public abstract class AbstractRecordFilter<T extends AbstractRecordFilter> imple
4546
* {@inheritDoc}
4647
*/
4748
@Override
48-
public void configure(final Map<String, ?> props) {
49+
public void configure(final Map<String, ?> configs) {
50+
// intentionally left blank
51+
}
52+
53+
/**
54+
* {@inheritDoc}
55+
*/
56+
@Override
57+
public void configure(final Map<String, ?> props, final RecordFilterProvider provider) {
4958
final CommonFilterConfig config = new CommonFilterConfig(configDef(), props);
5059
condition = config.condition();
51-
failurePipeline = config.onFailure();
5260
ignoreFailure = config.ignoreFailure();
61+
if (!config.onFailure().isEmpty()) {
62+
failurePipeline = new DefaultRecordFilterPipeline(config.onFailure().stream()
63+
.map(provider::getRecordForAlias)
64+
.collect(Collectors.toList()));
65+
}
66+
configure(props);
5367
}
5468

5569
/**

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/AppendFilterTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void setUp() {
5656
public void shouldSupportPropertyExpressionForValueConfig() {
5757
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$.target");
5858
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
59-
filter.configure(configs);
59+
filter.configure(configs, alias -> null);
6060

6161
RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
6262
Assert.assertNotNull(output);
@@ -68,7 +68,7 @@ public void shouldSupportPropertyExpressionForValueConfig() {
6868
public void shouldSupportPropertyExpressionForFieldConfig() {
6969
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$.target");
7070
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
71-
filter.configure(configs);
71+
filter.configure(configs, alias -> null);
7272

7373
RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
7474
Assert.assertNotNull(output);
@@ -80,7 +80,7 @@ public void shouldSupportPropertyExpressionForFieldConfig() {
8080
public void shouldSupportSubstitutionExpressionForFieldConfig() {
8181
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "{{ '$.'extract_array($.values,0) }}");
8282
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, EXPRESSION);
83-
filter.configure(configs);
83+
filter.configure(configs, alias -> null);
8484

8585
RecordsIterable<TypedStruct> output = filter.apply(context, STRUCT);
8686
Assert.assertNotNull(output);
@@ -92,7 +92,7 @@ public void shouldSupportSubstitutionExpressionForFieldConfig() {
9292
public void shouldSupportPropertyExpressionWithScopeForFieldConfig() {
9393
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$topic");
9494
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "my-topic-{{ extract_array($.values,0) }}");
95-
filter.configure(configs);
95+
filter.configure(configs, alias -> null);
9696
filter.apply(context, STRUCT);
9797
Assert.assertEquals("my-topic-foo", context.topic());
9898
}
@@ -102,7 +102,7 @@ public void shouldOverwriteExistingValueGivenOverwriteTrue() {
102102
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$value.field");
103103
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "bar");
104104
configs.put(AppendFilterConfig.APPEND_OVERWRITE_CONFIG, "true");
105-
filter.configure(configs);
105+
filter.configure(configs, alias -> null);
106106
final TypedStruct input = TypedStruct.create().put("field", "foo");
107107
RecordsIterable<TypedStruct> results = filter.apply(context, input, false);
108108
Assert.assertEquals("bar", results.last().getString("field"));
@@ -113,7 +113,7 @@ public void shouldMergeExistingValueGivenOverwriteFalse() {
113113
configs.put(AppendFilterConfig.APPEND_FIELD_CONFIG, "$value.field");
114114
configs.put(AppendFilterConfig.APPEND_VALUE_CONFIG, "bar");
115115
configs.put(AppendFilterConfig.APPEND_OVERWRITE_CONFIG, "false");
116-
filter.configure(configs);
116+
filter.configure(configs, alias -> null);
117117
final TypedStruct input = TypedStruct.create().put("field", "foo");
118118
RecordsIterable<TypedStruct> results = filter.apply(context, input, false);
119119
Assert.assertEquals("[foo, bar]", results.last().getArray("field").toString());

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/ConvertFilterTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void setUp() {
5454
public void should_convert_value_given_valid_field() {
5555
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field");
5656
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "boolean");
57-
filter.configure(configs);
57+
filter.configure(configs, alias -> null);
5858

5959
TypedStruct struct = TypedStruct.create().put("field", "yes");
6060
List<TypedStruct> results = filter.apply(context, struct, false).collect();
@@ -71,7 +71,7 @@ public void should_convert_value_given_valid_field() {
7171
public void should_convert_value_given_valid_path() {
7272
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field.child");
7373
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "boolean");
74-
filter.configure(configs);
74+
filter.configure(configs, alias -> null);
7575

7676
TypedStruct struct = TypedStruct.create().insert("field.child", "yes");
7777
List<TypedStruct> results = filter.apply(context, struct, false).collect();
@@ -89,7 +89,7 @@ public void should_fail_given_invalid_path_and_ignore_missing_false() {
8989
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field");
9090
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "boolean");
9191
configs.put(ConvertFilterConfig.CONVERT_IGNORE_MISSING_CONFIG, "false");
92-
filter.configure(configs);
92+
filter.configure(configs, alias -> null);
9393
filter.apply(context, TypedStruct.create(), false).collect();
9494
}
9595

@@ -98,7 +98,7 @@ public void should_fail_given_not_convertible_value_and_not_default() {
9898
configs.put(ConvertFilterConfig.CONVERT_FIELD_CONFIG, "field");
9999
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "integer");
100100
configs.put(ConvertFilterConfig.CONVERT_IGNORE_MISSING_CONFIG, "false");
101-
filter.configure(configs);
101+
filter.configure(configs, alias -> null);
102102

103103
TypedStruct struct = TypedStruct.create().insert("field", "dummy");
104104
filter.apply(context, struct, false).collect();
@@ -110,7 +110,7 @@ public void should_use_default_given_not_convertible_value() {
110110
configs.put(ConvertFilterConfig.CONVERT_TO_CONFIG, "integer");
111111
configs.put(ConvertFilterConfig.CONVERT_DEFAULT_CONFIG, "-1");
112112
configs.put(ConvertFilterConfig.CONVERT_IGNORE_MISSING_CONFIG, "false");
113-
filter.configure(configs);
113+
filter.configure(configs, alias -> null);
114114

115115
TypedStruct struct = TypedStruct.create().insert("field", "dummy");
116116
List<TypedStruct> results = filter.apply(context, struct, false).collect();

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void shouldConvertToEpochTimeGivenNoTimezoneAndNoLocale() {
5353
configs.put(DateFilterConfig.DATE_TARGET_CONFIG, "$.timestamp");
5454
configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("yyyy-MM-dd'T'HH:mm:ss"));
5555

56-
filter.configure(configs);
56+
filter.configure(configs, alias -> null);
5757
TypedStruct struct = TypedStruct.create().put("date", "2001-07-04T12:08:56");
5858
List<TypedStruct> results = filter.apply(context, struct, false).collect();
5959

@@ -69,7 +69,7 @@ public void shouldConvertToEpochTimeGivenTimezone() {
6969
configs.put(DateFilterConfig.DATE_TIMEZONE_CONFIG, "Europe/Paris");
7070
configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("yyyy-MM-dd'T'HH:mm:ss"));
7171

72-
filter.configure(configs);
72+
filter.configure(configs, alias -> null);
7373
TypedStruct struct = TypedStruct.create().put("date", "2001-07-04T14:08:56");
7474
List<TypedStruct> results = filter.apply(context, struct, false).collect();
7575

@@ -85,7 +85,7 @@ public void shouldConvertToEpochTimeGivenLocale() {
8585
configs.put(DateFilterConfig.DATE_LOCALE_CONFIG, "fr_FR");
8686
configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("EEEE, d MMMM yyyy HH:mm:ss"));
8787

88-
filter.configure(configs);
88+
filter.configure(configs, alias -> null);
8989
TypedStruct struct = TypedStruct.create().put("date", "mercredi, 4 juillet 2001 12:08:56");
9090
List<TypedStruct> results = filter.apply(context, struct, false).collect();
9191

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DelimitedRowFileInputFilterTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void setUp() {
5656

5757
@Test
5858
public void should_auto_generate_schema_given_no_schema_field() {
59-
filter.configure(configs);
59+
filter.configure(configs, alias -> null);
6060
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
6161
Assert.assertNotNull(output);
6262
Assert.assertEquals(1, output.size());
@@ -70,7 +70,7 @@ public void should_auto_generate_schema_given_no_schema_field() {
7070
@Test
7171
public void should_extract_column_names_from_given_field() {
7272
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
73-
filter.configure(configs);
73+
filter.configure(configs, alias -> null);
7474
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
7575
Assert.assertNotNull(output);
7676
Assert.assertEquals(1, output.size());
@@ -85,7 +85,7 @@ public void should_extract_column_names_from_given_field() {
8585
public void should_extract_repeated_columns_names_from_given_field() {
8686
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
8787
configs.put(READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG, "true");
88-
filter.configure(configs);
88+
filter.configure(configs, alias -> null);
8989

9090
final TypedStruct input = TypedStruct.create()
9191
.put("message", "value1;value2-1;value2-2;value2-3;value3;value2-4")
@@ -106,7 +106,7 @@ public void should_extract_repeated_columns_names_from_given_field() {
106106
public void should_fail_given_repeated_columns_names_and_duplicate_not_allowed() {
107107
configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers");
108108
configs.put(READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG, "false");
109-
filter.configure(configs);
109+
filter.configure(configs, alias -> null);
110110

111111
final TypedStruct input = TypedStruct.create()
112112
.put("message", "value1;value2-1;value2-2;value2-3;value3;value2-4")
@@ -119,7 +119,7 @@ public void should_fail_given_repeated_columns_names_and_duplicate_not_allowed()
119119
@Test
120120
public void should_use_configured_schema() {
121121
configs.put(READER_FIELD_COLUMNS_CONFIG, "c1:STRING;c2:INTEGER;c3:BOOLEAN");
122-
filter.configure(configs);
122+
filter.configure(configs, alias -> null);
123123
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
124124
Assert.assertNotNull(output);
125125
Assert.assertEquals(1, output.size());

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/FailFilterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void setUp() {
5454
public void shouldThrownExceptionWhenConditionIsTrue() {
5555
configs.put(FailFilterConfig.MESSAGE_CONFIG, "Unexpected error");
5656
FailFilter filter = new FailFilter();
57-
filter.configure(configs);
57+
filter.configure(configs, alias -> null);
5858

5959
try {
6060
filter.apply(context, DEFAULT_DATA, false);
@@ -70,7 +70,7 @@ public void shouldThrownExceptionWhenConditionIsTrue() {
7070
public void shouldEvaluateMessageExpression() {
7171
configs.put(FailFilterConfig.MESSAGE_CONFIG, "Unexpected error : {{ $value.message }}");
7272
FailFilter filter = new FailFilter();
73-
filter.configure(configs);
73+
filter.configure(configs, alias -> null);
7474

7575
try {
7676
filter.apply(context, DEFAULT_DATA, false);

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void setUp() {
5050
@Test
5151
public void testGivenDefaultProperties() {
5252
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
53-
filter.configure(configs);
53+
filter.configure(configs, alias -> null);
5454
List<TypedStruct> results = filter.apply(null, DATA, false).collect();
5555

5656
Assert.assertEquals(1, results.size());
@@ -66,7 +66,7 @@ public void testGivenDefaultProperties() {
6666
public void testGivenOverwriteProperty() {
6767
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
6868
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
69-
filter.configure(configs);
69+
filter.configure(configs, alias -> null);
7070
List<TypedStruct> results = filter.apply(null, DATA, false).collect();
7171

7272
Assert.assertEquals(1, results.size());
@@ -80,7 +80,7 @@ public void testGivenOverwriteProperty() {
8080
public void testGivenNotMatchingInput() {
8181
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
8282
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
83-
filter.configure(configs);
83+
filter.configure(configs, alias -> null);
8484
filter.apply(null, TypedStruct.create().put("message", "BAD INPUT"), false);
8585
}
8686

@@ -89,7 +89,7 @@ public void testGivenPatternWithNoGroupWhenCapturedNameOnlyIsFalse() {
8989
configs.put(GrokFilterConfig.GROK_ROW_PATTERN_CONFIG, GROK_PATTERN);
9090
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
9191
configs.put(GrokFilterConfig.GROK_ROW_NAMED_CAPTURES_ONLY_CONFIG, "false");
92-
filter.configure(configs);
92+
filter.configure(configs, alias -> null);
9393
List<TypedStruct> results = filter.apply(null, DATA, false).collect();
9494

9595
Assert.assertEquals(1, results.size());

0 commit comments

Comments
 (0)