Skip to content

Commit 61a890a

Browse files
committed
fix(filter): fix JSONFilter throwns NPE when parsing empty Array (#109)
Resolves: GH-109
1 parent 3638b21 commit 61a890a

File tree

10 files changed

+104
-11
lines changed

10 files changed

+104
-11
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/ArraySchema.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,22 @@ public <T> T map(final SchemaMapper<T> mapper) {
5858
return mapper.map(this);
5959
}
6060

61+
/**
62+
* {@inheritDoc}
63+
*/
6164
@Override
6265
public <T> T map(final SchemaMapperWithValue<T> mapper, final Object object) {
6366
return mapper.map(this, (Collection)object);
6467
}
6568

69+
/**
70+
* {@inheritDoc}
71+
*/
72+
@Override
73+
public boolean isResolvable() {
74+
return valueSchema.isResolvable();
75+
}
76+
6677
/**
6778
* {@inheritDoc}
6879
*/

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/LazyArraySchema.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,12 @@ public Schema valueSchema() {
5151
}
5252
return valueSchema;
5353
}
54+
55+
/**
56+
* {@inheritDoc}
57+
*/
58+
@Override
59+
public boolean isResolvable() {
60+
return valueSchema != null || !list.isEmpty();
61+
}
5462
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/LazyMapSchema.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ public class LazyMapSchema extends MapSchema implements Schema {
3535

3636
}
3737

38+
/**
39+
* {@inheritDoc}
40+
*/
3841
@Override
3942
public Schema valueSchema() {
4043
if (valueSchema == null) {
@@ -46,4 +49,12 @@ public Schema valueSchema() {
4649
}
4750
return valueSchema;
4851
}
52+
53+
/**
54+
* {@inheritDoc}
55+
*/
56+
@Override
57+
public boolean isResolvable() {
58+
return valueSchema!= null || !map.isEmpty();
59+
}
4960
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/MapSchema.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,22 @@ public <T> T map(final SchemaMapper<T> mapper) {
6565
return mapper.map(this);
6666
}
6767

68+
/**
69+
* {@inheritDoc}
70+
*/
6871
@Override
6972
public <T> T map(final SchemaMapperWithValue<T> mapper, final Object object) {
7073
return mapper.map(this, (Map<String, ?>)object);
7174
}
7275

76+
/**
77+
* {@inheritDoc}
78+
*/
79+
@Override
80+
public boolean isResolvable() {
81+
return valueSchema.isResolvable();
82+
}
83+
7384
/**
7485
* {@inheritDoc}
7586
*/

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/Schema.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,17 @@ static ArraySchema array(final Collection<?> value, final Schema valueSchema) {
152152
*/
153153
Type type() ;
154154

155+
/**
156+
* Checks whether this schemas is resolvable.
157+
*
158+
* @see LazyArraySchema
159+
* @see LazyMapSchema
160+
* @return {@code true}.
161+
*/
162+
default boolean isResolvable() {
163+
return true;
164+
}
165+
155166
/**
156167
* Maps this schema into a new type T.
157168
*

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.kafka.connect.data.SchemaAndValue;
3535
import org.apache.kafka.connect.data.SchemaBuilder;
3636
import org.apache.kafka.connect.data.Struct;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
3739

3840
import java.util.Arrays;
3941
import java.util.Collection;
@@ -44,6 +46,8 @@
4446

4547
public class ConnectSchemaMapper implements SchemaMapper<Schema>, SchemaMapperWithValue<SchemaAndValue> {
4648

49+
private static final Logger LOG = LoggerFactory.getLogger(ConnectSchemaMapper.class);
50+
4751
private static final Object DEFAULT_NULL_VALUE = null;
4852

4953
public static final ConnectSchemaMapper INSTANCE = new ConnectSchemaMapper();
@@ -96,12 +100,14 @@ public Schema map(final StructSchema schema) {
96100
}
97101

98102
for(final TypedField field : schema) {
103+
final String fieldName = field.name();
99104
final io.streamthoughts.kafka.connect.filepulse.data.Schema fieldSchema = field.schema();
100105
// Ignore schema NULL because cannot determine the expected type.
101-
if (fieldSchema.type() != Type.NULL) {
102-
final String fieldName = field.name();
106+
if (fieldSchema.type() != Type.NULL && fieldSchema.isResolvable()) {
103107
mayUpdateSchemaName(fieldSchema, fieldName);
104108
sb.field(fieldName, fieldSchema.map(this)).optional();
109+
} else {
110+
LOG.debug("Ignore field '{}', schema type is either NULL or cannot be resolved.", fieldName);
105111
}
106112
}
107113
return sb.build();

connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapperTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
public class ConnectSchemaMapperTest {
3333

3434
@Test
35-
public void shouldMapGivenSimpleTypedStruct() {
35+
public void should_map_given_simple_typed_struct() {
3636

3737
TypedStruct struct = TypedStruct.create()
3838
.put("field1", "value1")
@@ -47,7 +47,7 @@ public void shouldMapGivenSimpleTypedStruct() {
4747
}
4848

4949
@Test
50-
public void shouldMapGivenNestedTypedStruct() {
50+
public void should_map_given_nested_typed_struct() {
5151
TypedStruct struct = TypedStruct.create()
5252
.put("field1", TypedStruct.create().put("field2", "value2"));
5353

@@ -63,7 +63,7 @@ public void shouldMapGivenNestedTypedStruct() {
6363

6464
@Test
6565
@SuppressWarnings("unchecked")
66-
public void shouldMapGivenTypeStructWithArrayField() {
66+
public void should_map_given_type_struct_with_array_field() {
6767
TypedStruct struct = TypedStruct.create()
6868
.put("field1", Collections.singletonList("value"));
6969

@@ -78,7 +78,7 @@ public void shouldMapGivenTypeStructWithArrayField() {
7878

7979
@Test
8080
@SuppressWarnings("unchecked")
81-
public void shouldMapGivenTypeStructWithArrayOfStruct() {
81+
public void should_Map_given_type_struct_with_array_of_struct() {
8282
TypedStruct struct = TypedStruct.create()
8383
.put("field1", Collections.singletonList(TypedStruct.create().put("field2", "value")));
8484

@@ -108,7 +108,7 @@ public void shouldMapGivenTypeStructWithMapField() {
108108

109109
@Test
110110
@SuppressWarnings("unchecked")
111-
public void shouldMapGivenTypeStructWithMapWithStructValue() {
111+
public void should_map_given_type_struct_with_map_with_struct_value() {
112112
TypedStruct struct = TypedStruct.create()
113113
.put("field1",
114114
Collections.singletonMap(
@@ -126,7 +126,7 @@ public void shouldMapGivenTypeStructWithMapWithStructValue() {
126126
}
127127

128128
@Test
129-
public void shouldMapGivenTypeStructWithNullValue() {
129+
public void should_map_given_type_struct_with_null_value() {
130130
TypedStruct struct = TypedStruct.create()
131131
.put("field1", "value1")
132132
.put("field2", Schema.none(), null);
@@ -139,6 +139,20 @@ public void shouldMapGivenTypeStructWithNullValue() {
139139
Assert.assertNull(connectStruct.schema().field("field2"));
140140
}
141141

142+
@Test
143+
public void should_map_given_type_struct_with_empty_array() {
144+
TypedStruct struct = TypedStruct.create()
145+
.put("field1", "value1")
146+
.put("field2", Schema.array(Collections.emptyList(), null), Collections.emptyList());
147+
148+
SchemaAndValue schemaAndValue = struct.schema().map(ConnectSchemaMapper.INSTANCE, struct);
149+
Assert.assertNotNull(schemaAndValue);
150+
151+
Struct connectStruct = (Struct)schemaAndValue.value();
152+
Assert.assertNotNull(connectStruct.schema().field("field1"));
153+
Assert.assertNull(connectStruct.schema().field("field2"));
154+
}
155+
142156
@Test
143157
public void test_normalize_schema_name() {
144158
Assert.assertEquals("Foo", ConnectSchemaMapper.normalizeSchemaName("foo"));

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/json/DefaultJSONStructConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.jsoniter.ValueType;
2323
import com.jsoniter.any.Any;
2424
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
25+
import io.streamthoughts.kafka.connect.filepulse.data.SchemaSupplier;
2526
import io.streamthoughts.kafka.connect.filepulse.data.Type;
2627
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2728
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
@@ -147,7 +148,7 @@ public TypedValue read(JsonIterator it) throws IOException {
147148
type = read.type();
148149
array.add(read.value());
149150
}
150-
return TypedValue.array(array, type);
151+
return TypedValue.array(array, type != null ? Schema.of(type) : SchemaSupplier.lazy(array).get());
151152
}
152153
}
153154
private static class ObjectJsonFieldAccessor implements JsonFieldAccessor<TypedStruct> {

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/JSONFilterTest.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919
package io.streamthoughts.kafka.connect.filepulse.filter;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.config.JSONFilterConfig;
22+
import io.streamthoughts.kafka.connect.filepulse.data.SchemaSupplier;
2223
import io.streamthoughts.kafka.connect.filepulse.data.Type;
2324
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2425
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
26+
import io.streamthoughts.kafka.connect.filepulse.source.internal.ConnectSchemaMapper;
2527
import org.junit.Assert;
2628
import org.junit.Before;
2729
import org.junit.Test;
2830

2931
import java.nio.charset.StandardCharsets;
32+
import java.util.ArrayList;
3033
import java.util.Arrays;
3134
import java.util.Collections;
3235
import java.util.HashMap;
@@ -45,6 +48,7 @@ public class JSONFilterTest {
4548
private static final TypedStruct BYTES_RECORD = create().put("message", JSON.getBytes(StandardCharsets.UTF_8));
4649

4750
private static final TypedStruct ARRAY_STRING_RECORD = create().put("message", ARRAY_STRING_JSON);
51+
private static final TypedStruct ARRAY_EMPTY_RECORD = create().put("message", "[]");
4852
private static final TypedStruct ARRAY_STRUCT_RECORD = create().put("message", ARRAY_STRUCT_JSON);
4953

5054
private JSONFilter filter;
@@ -83,12 +87,28 @@ public void should_add_parsed_array_into_specific_field_given_target_field() {
8387
}});
8488

8589
List<TypedStruct> expected = Collections.singletonList(
86-
create().put("message", ARRAY_STRING_JSON).put("myTarget", TypedValue.array(Arrays.asList("foo", "bar"), Type.STRING))
90+
create().put("message", ARRAY_STRING_JSON)
91+
.put("myTarget", TypedValue.array(Arrays.asList("foo", "bar"), Type.STRING))
8792
);
8893

8994
assertOutput(filter.apply(null, ARRAY_STRING_RECORD, false).collect(), expected);
9095
}
9196

97+
@Test
98+
public void should_add_parsed_array_into_specific_field_given_target_field_and_empty_array() {
99+
filter.configure(new HashMap<String, Object>(){{
100+
put(JSONFilterConfig.JSON_TARGET_CONFIG, "myTarget");
101+
}});
102+
103+
List<TypedStruct> expected = Collections.singletonList(
104+
create().put("message", "[]")
105+
.put("myTarget", TypedValue.array(new ArrayList<>(), SchemaSupplier.lazy(new ArrayList<>()).get()))
106+
);
107+
108+
final List<TypedStruct> collect = filter.apply(null, ARRAY_EMPTY_RECORD, false).collect();
109+
assertOutput(collect, expected);
110+
}
111+
92112
@Test
93113
public void should_add_parsed_array_into_specific_field_given_target_and_explode_true() {
94114
filter.configure(new HashMap<String, Object>(){{

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/json/DefaultJSONStructConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
public class DefaultJSONStructConverterTest {
3636

37-
private JSONStructConverter converter = new DefaultJSONStructConverter();
37+
private final JSONStructConverter converter = new DefaultJSONStructConverter();
3838

3939
@Test
4040
public void shouldConvertGivenArrayOfComplexType() throws Exception {

0 commit comments

Comments
 (0)