Skip to content

Commit 2877c33

Browse files
committed
fix(filters): fix JSONFilter not supporting null field (#71)
This commit improves support for NULL field with no explicit schema. Resolves: #71
1 parent b0f6f29 commit 2877c33

File tree

6 files changed

+48
-7
lines changed

6 files changed

+48
-7
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedValue.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,8 @@ public boolean isEmpty() {
358358
}
359359

360360
public TypedValue as(final Type type) {
361-
return TypedValue.of(type.convert(value), type);
361+
Object converted = isNull() ? null : type.convert(value);
362+
return TypedValue.of(converted, type);
362363
}
363364

364365
/**

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,13 @@ public Schema map(final StructSchema schema) {
9696

9797
for(final TypedField field : schema) {
9898
final io.streamthoughts.kafka.connect.filepulse.data.Schema fieldSchema = field.schema();
99-
final String fieldName = field.name();
100-
mayUpdateSchemaName(fieldSchema, fieldName);
101-
sb.field(fieldName, fieldSchema.map(this)).optional();
99+
// Ignore schema NULL because cannot determine the expected type.
100+
if (fieldSchema.type() != Type.NULL) {
101+
final String fieldName = field.name();
102+
mayUpdateSchemaName(fieldSchema, fieldName);
103+
sb.field(fieldName, fieldSchema.map(this)).optional();
104+
}
102105
}
103-
104106
return sb.build();
105107
}
106108

connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapperTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.source.internal;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
2122
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2223
import org.apache.kafka.connect.data.SchemaAndValue;
2324
import org.apache.kafka.connect.data.Struct;
@@ -124,4 +125,18 @@ public void shouldMapGivenTypeStructWithMapWithStructValue() {
124125
Assert.assertEquals("value", field1.get("field2").getString("field3"));
125126
}
126127

128+
@Test
129+
public void shouldMapGivenTypeStructWithNullValue() {
130+
TypedStruct struct = TypedStruct.create()
131+
.put("field1", "value1")
132+
.put("field2", Schema.none(), null);
133+
134+
SchemaAndValue schemaAndValue = struct.schema().map(ConnectSchemaMapper.INSTANCE, struct);
135+
Assert.assertNotNull(schemaAndValue);
136+
137+
Struct connectStruct = (Struct)schemaAndValue.value();
138+
Assert.assertNotNull(connectStruct.schema().field("field1"));
139+
Assert.assertNull(connectStruct.schema().field("field2"));
140+
}
141+
127142
}

connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/Converts.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public SimpleArguments prepare(final TypedValue[] args) {
5757
*/
5858
@Override
5959
public TypedValue apply(final TypedValue field, final SimpleArguments args) {
60-
Type type = args.valueOf(TYPE);
61-
return TypedValue.of(type.convert(field.value()), type);
60+
return field.as(args.valueOf(TYPE));
6261
}
6362
}

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/json/DefaultJSONStructConverter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.jsoniter.JsonIterator;
2222
import com.jsoniter.ValueType;
2323
import com.jsoniter.any.Any;
24+
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
2425
import io.streamthoughts.kafka.connect.filepulse.data.Type;
2526
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2627
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
@@ -45,6 +46,7 @@ public DefaultJSONStructConverter() {
4546
ACCESSORS.put(ValueType.OBJECT, new ObjectJsonFieldAccessor());
4647
ACCESSORS.put(ValueType.BOOLEAN, new BooleanJsonFieldAccessor());
4748
ACCESSORS.put(ValueType.NUMBER, new NumberJsonFieldAccessor());
49+
ACCESSORS.put(ValueType.NULL, new NullJsonFieldAccessor());
4850
}
4951

5052
private static JsonFieldAccessor<?> getAccessorForType(final ValueType type) {
@@ -83,6 +85,14 @@ private interface JsonFieldAccessor<T> {
8385
TypedValue read(final JsonIterator it) throws IOException;
8486
}
8587

88+
private static class NullJsonFieldAccessor implements JsonFieldAccessor<Object> {
89+
90+
@Override
91+
public TypedValue read(JsonIterator it) {
92+
return TypedValue.of(null, Schema.none());
93+
}
94+
}
95+
8696
private static class NumberJsonFieldAccessor implements JsonFieldAccessor<Object> {
8797

8898
@Override

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/json/DefaultJSONStructConverterTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,18 @@ public void shouldConvertGivenFieldsWithNumberType() throws Exception {
127127
assertEquals(Float.MAX_VALUE, struct.getDouble("field-float").floatValue(), 0.0);
128128
}
129129

130+
@Test
131+
public void shouldConvertGivenFieldWithNullValue() throws Exception {
132+
TypedValue value = converter.readJson("{\"field-one\" : \"one\", \"field-two\":null}");
133+
Assert.assertNotNull(value);
134+
Assert.assertEquals(Type.STRUCT, value.type());
135+
TypedStruct struct = value.getStruct();
136+
137+
StructSchema schema = struct.schema();
138+
assertEquals(2, schema.fields().size());
139+
assertNotNull(schema.field("field-one"));
140+
assertEquals(Type.STRING, schema.field("field-one").schema().type());
141+
assertNotNull(schema.field("field-two"));
142+
assertEquals(Type.NULL, schema.field("field-two").schema().type());
143+
}
130144
}

0 commit comments

Comments
 (0)