Skip to content

Commit 5a65c3b

Browse files
committed
feat(filters): add support for repeated columns to DelimitedRowFilter (#106)
This commit adds a new config property: duplicateColumnsAsArray Resolves: GH-106
1 parent 61a890a commit 5a65c3b

File tree

8 files changed

+180
-53
lines changed

8 files changed

+180
-53
lines changed

checkstyle/checkstyle.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
<module name="Header">
4646
<property name="headerFile" value="${basedir}/checkstyle/apache-header" />
47+
<property name="ignoreLines" value="1, 2"/>
4748
</module>
4849

4950
<module name="TreeWalker">

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,6 @@
6767
<suppress checks="Header" files="log4j.properties"/>
6868

6969
<suppress checks="[a-zA-Z0-9]*" files="src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/antlr4/*"/>
70-
<suppress checks="BooleanExpressionComplexity" files="Split.java"/>
70+
<suppress checks="BooleanExpressionComplexity" files="io.streamthoughts.kafka.connect.filepulse.internal.StringUtils"/>
7171

7272
</suppressions>

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/Schema.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,17 @@ static MapSchema map(final Map<String ,?> value, final Schema valueSchema) {
135135
return valueSchema == null ? new LazyMapSchema(value) : new MapSchema(valueSchema);
136136
}
137137

138+
/**
139+
* Gets the schema for type ARRAY.
140+
*
141+
* @param valueSchema the {@link Schema} instance.
142+
*
143+
* @return the {@link Schema} instance.
144+
*/
145+
static ArraySchema array(final Schema valueSchema) {
146+
return new ArraySchema(valueSchema);
147+
}
148+
138149
/**
139150
* Gets the schema for type ARRAY.
140151
*
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.internal;
20+
21+
public class StringUtils {
22+
23+
/**
24+
* @see String#split(String).
25+
*/
26+
public static boolean isFastSplit(final String regex) {
27+
char ch = 0;
28+
return
29+
((regex.length() == 1 && ".$|()[{^?*+\\".indexOf(ch = regex.charAt(0)) == -1) ||
30+
(regex.length() == 2 &&
31+
regex.charAt(0) == '\\' &&
32+
(((ch = regex.charAt(1))-'0')|('9'-ch)) < 0 &&
33+
((ch-'a')|('z'-ch)) < 0 &&
34+
((ch-'A')|('Z'-ch)) < 0)) &&
35+
(ch < Character.MIN_HIGH_SURROGATE ||
36+
ch > Character.MAX_LOW_SURROGATE);
37+
}
38+
}

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/Split.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction;
2929
import io.streamthoughts.kafka.connect.filepulse.expression.function.GenericArgument;
3030
import io.streamthoughts.kafka.connect.filepulse.expression.function.MissingArgumentValue;
31+
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
3132

3233
import java.util.Arrays;
3334
import java.util.regex.Pattern;
@@ -57,7 +58,7 @@ public Arguments<?> prepare(final Expression[] args) {
5758

5859
final String regex = ((ValueExpression) args[1]).value().getString();
5960
Object regexArgument;
60-
if (isFastSplit(regex)) {
61+
if (StringUtils.isFastSplit(regex)) {
6162
regexArgument = regex;
6263
} else {
6364
regexArgument = Pattern.compile(regex);
@@ -87,20 +88,4 @@ public TypedValue apply(Arguments<GenericArgument> args) {
8788
}
8889
return TypedValue.array(Arrays.asList(split), Type.STRING);
8990
}
90-
91-
/**
92-
* @see String#split(String).
93-
*/
94-
private static boolean isFastSplit(final String regex) {
95-
char ch = 0;
96-
return
97-
((regex.length() == 1 && ".$|()[{^?*+\\".indexOf(ch = regex.charAt(0)) == -1) ||
98-
(regex.length() == 2 &&
99-
regex.charAt(0) == '\\' &&
100-
(((ch = regex.charAt(1))-'0')|('9'-ch)) < 0 &&
101-
((ch-'a')|('z'-ch)) < 0 &&
102-
((ch-'A')|('Z'-ch)) < 0)) &&
103-
(ch < Character.MIN_HIGH_SURROGATE ||
104-
ch > Character.MAX_LOW_SURROGATE);
105-
}
10691
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/DelimitedRowFilterConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public class DelimitedRowFilterConfig extends CommonFilterConfig {
3838
public static final String READER_FIELD_TRIM_COLUMN_DOC = "Remove the leading and trailing whitespaces from all columns.";
3939
public static final boolean READER_FIELD_TRIM_COLUMN_DEFAULT = false;
4040

41+
public static final String READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG = "duplicateColumnsAsArray";
42+
public static final String READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_DOC = "Treat duplicate columns as an array. If false and a record contains duplicate columns an exception will be thrown.";
43+
public static final String READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_DEFAULT= "false";
44+
4145
public static final String READER_EXTRACT_COLUMN_NAME_CONFIG = "extractColumnName";
4246
public static final String READER_EXTRACT_COLUMN_NAME_DOC = "Define the field from which the schema should be detected (all columns will be of type 'withMessage')";
4347

@@ -72,6 +76,10 @@ public boolean isAutoGenerateColumnNames() {
7276
return getBoolean(READER_AUTO_GENERATE_COLUMN_NAME_CONFIG);
7377
}
7478

79+
public boolean isDuplicateColumnsAsArray() {
80+
return getBoolean(READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG);
81+
}
82+
7583
public StructSchema schema() {
7684
final String columns = getString(READER_FIELD_COLUMNS_CONFIG);
7785
if (columns == null) return null;
@@ -114,6 +122,9 @@ public static ConfigDef configDef() {
114122
.define(READER_AUTO_GENERATE_COLUMN_NAME_CONFIG, ConfigDef.Type.BOOLEAN, READER_AUTO_GENERATE_COLUMN_NAME_DEFAULT,
115123
ConfigDef.Importance.HIGH, READER_AUTO_GENERATE_COLUMN_NAME_DOC)
116124

125+
.define(READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_CONFIG, ConfigDef.Type.BOOLEAN, READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_DEFAULT,
126+
ConfigDef.Importance.LOW, READER_FIELD_DUPLICATE_COLUMNS_AS_ARRAY_DOC)
127+
117128
.define(READER_FIELD_TRIM_COLUMN_CONFIG, ConfigDef.Type.BOOLEAN, READER_FIELD_TRIM_COLUMN_DEFAULT,
118129
ConfigDef.Importance.LOW, READER_FIELD_TRIM_COLUMN_DOC);
119130
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DelimitedRowFilter.java

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,21 @@
2424
import io.streamthoughts.kafka.connect.filepulse.data.Type;
2525
import io.streamthoughts.kafka.connect.filepulse.data.TypedField;
2626
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
27+
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
2728
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
2829
import org.apache.kafka.common.config.ConfigDef;
2930
import org.apache.kafka.common.config.ConfigException;
3031

32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.HashMap;
3135
import java.util.List;
3236
import java.util.Map;
3337
import java.util.StringJoiner;
38+
import java.util.function.Function;
39+
import java.util.regex.Pattern;
40+
import java.util.stream.Collectors;
41+
import java.util.stream.IntStream;
3442

3543
import static io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig.READER_AUTO_GENERATE_COLUMN_NAME_CONFIG;
3644
import static io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig.READER_EXTRACT_COLUMN_NAME_CONFIG;
@@ -48,6 +56,10 @@ public class DelimitedRowFilter extends AbstractRecordFilter<DelimitedRowFilter>
4856

4957
private StructSchema schema;
5058

59+
private final Map<Integer, TypedField> columnsTypesByIndex = new HashMap<>();
60+
61+
private Pattern pattern = null;
62+
5163
/**
5264
* {@inheritDoc}
5365
*/
@@ -59,19 +71,25 @@ public void configure(final Map<String, ?> configs) {
5971
if (isMandatoryConfigsMissing()) {
6072
StringJoiner joiner = new StringJoiner(",", "[", "]");
6173
final String mandatory = joiner
62-
.add(READER_AUTO_GENERATE_COLUMN_NAME_CONFIG)
63-
.add(READER_EXTRACT_COLUMN_NAME_CONFIG)
64-
.add(READER_FIELD_COLUMNS_CONFIG).toString();
74+
.add(READER_AUTO_GENERATE_COLUMN_NAME_CONFIG)
75+
.add(READER_EXTRACT_COLUMN_NAME_CONFIG)
76+
.add(READER_FIELD_COLUMNS_CONFIG).toString();
6577
throw new ConfigException("At least one of those parameters should be configured " + mandatory);
6678
}
6779

80+
if (!StringUtils.isFastSplit(this.configs.delimiter())) pattern = Pattern.compile(this.configs.delimiter());
81+
6882
this.schema = this.configs.schema();
83+
if (schema != null) {
84+
final List<TypedField> fields = schema.fields();
85+
IntStream.range(0, fields.size()).forEach(i -> columnsTypesByIndex.put(i, fields.get(i)));
86+
}
6987
}
7088

7189
private boolean isMandatoryConfigsMissing() {
7290
return configs.schema() == null &&
73-
configs.extractColumnName() == null &&
74-
!configs.isAutoGenerateColumnNames();
91+
configs.extractColumnName() == null &&
92+
!configs.isAutoGenerateColumnNames();
7593
}
7694

7795
/**
@@ -87,52 +105,70 @@ public ConfigDef configDef() {
87105
*/
88106
@Override
89107
public RecordsIterable<TypedStruct> apply(final FilterContext context,
90-
final TypedStruct record,
91-
final boolean hasNext) throws FilterException {
108+
final TypedStruct record,
109+
final boolean hasNext) throws FilterException {
92110

93111
final String source = record.first(DEFAULT_SOURCE_FIELD).getString();
94112

95-
String[] fieldValues = splitFields(source);
96-
final StructSchema schema = getSchema(record, fieldValues.length);
97-
final TypedStruct struct = buildStructForFields(fieldValues, schema);
113+
String[] columnValues = splitColumnValues(source);
114+
if (schema == null) {
115+
inferSchemaFromRecord(record, columnValues.length);
116+
}
117+
final TypedStruct struct = buildStructForFields(columnValues);
98118
return RecordsIterable.of(struct);
99119
}
100120

101-
private StructSchema getSchema(final TypedStruct record, int n) {
102-
if (schema != null) return schema;
103-
121+
private void inferSchemaFromRecord(final TypedStruct record, int numColumns) {
104122
schema = Schema.struct();
123+
105124
if (configs.extractColumnName() != null) {
106125
final String fieldName = configs.extractColumnName();
107126
String field = record.first(fieldName).getString();
108127
if (field == null) {
109128
throw new FilterException(
110-
"Can't found field for name '" + fieldName + "' to determine columns names");
129+
"Can't found field for name '" + fieldName + "' to determine columns names");
111130
}
112-
final String[] columns = splitFields(field);
131+
final List<String> columns = Arrays
132+
.stream(splitColumnValues(field))
133+
.map(String::trim)
134+
.collect(Collectors.toList());
135+
136+
if (configs.isDuplicateColumnsAsArray()) {
137+
columns.stream()
138+
.collect(Collectors.groupingBy(Function.identity(), Collectors.<String>counting()))
139+
.entrySet()
140+
.stream()
141+
.collect(Collectors.toMap(Map.Entry::getKey, e -> {
142+
return e.getValue() > 1 ? Schema.array(DEFAULT_COLUMN_TYPE) : DEFAULT_COLUMN_TYPE;
143+
}))
144+
.forEach(schema::field);
145+
} else {
146+
columns.forEach(columnName -> schema.field(columnName, DEFAULT_COLUMN_TYPE));
147+
}
148+
IntStream.range(0, columns.size()).forEach(i -> columnsTypesByIndex.put(i, schema.field(columns.get(i))));
149+
return;
150+
}
113151

114-
for (String column : columns) {
115-
schema.field(column, DEFAULT_COLUMN_TYPE);
152+
if (configs.isAutoGenerateColumnNames()) {
153+
for (int i = 0; i < numColumns; i++) {
154+
final String fieldName = AUTO_GENERATED_COLUMN_NAME_PREFIX + (i + 1);
155+
schema.field(fieldName, DEFAULT_COLUMN_TYPE);
156+
columnsTypesByIndex.put(i, schema.field(fieldName));
116157
}
117-
} else if (configs.isAutoGenerateColumnNames()) {
118-
for (int i = 0; i < n; i++) {
119-
schema.field(AUTO_GENERATED_COLUMN_NAME_PREFIX + (i + 1), DEFAULT_COLUMN_TYPE);
120-
}
121-
} else {
122-
throw new FilterException("Can't found valid configuration to determine schema for input value");
158+
return;
123159
}
124-
return schema;
160+
161+
throw new FilterException("Can't found valid configuration to determine schema for input value");
125162
}
126163

127-
private String[] splitFields(final String value) {
128-
return value.split(configs.delimiter());
164+
private String[] splitColumnValues(final String value) {
165+
return pattern != null ? pattern.split(value) : value.split(configs.delimiter());
129166
}
130167

131-
private TypedStruct buildStructForFields(final String[] fieldValues, final StructSchema schema) {
132-
List<TypedField> fields = schema.fields();
133-
if (fieldValues.length > fields.size()) {
168+
private TypedStruct buildStructForFields(final String[] fieldValues) {
169+
if (fieldValues.length > columnsTypesByIndex.size()) {
134170
throw new FilterException(
135-
"Error while reading delimited input row. Too large number of fields (" + fieldValues.length + ")");
171+
"Error while reading delimited input row. Too large number of fields (" + fieldValues.length + ")");
136172
}
137173

138174
TypedStruct struct = TypedStruct.create();
@@ -141,9 +177,16 @@ private TypedStruct buildStructForFields(final String[] fieldValues, final Struc
141177
if (configs.isTrimColumn()) {
142178
fieldValue = fieldValue.trim();
143179
}
144-
TypedField field = fields.get(i);
180+
TypedField field = columnsTypesByIndex.get(i);
145181
final Type type = field.type();
146-
struct = struct.put(field.name(), type, type.convert(fieldValue));
182+
if (type == Type.ARRAY) {
183+
if (!struct.exists(field.name())) {
184+
struct.put(field.name(), new ArrayList<>());
185+
}
186+
struct.getArray(field.name()).add(fieldValue); // it seems to be OK to use type conversion here
187+
} else {
188+
struct = struct.put(field.name(), type, type.convert(fieldValue));
189+
}
147190
}
148191
return struct;
149192
}

0 commit comments

Comments
 (0)