1919package io .streamthoughts .kafka .connect .filepulse .filter ;
2020
2121import io .streamthoughts .kafka .connect .filepulse .config .JSONFilterConfig ;
22+ import io .streamthoughts .kafka .connect .filepulse .data .ArraySchema ;
23+ import io .streamthoughts .kafka .connect .filepulse .data .Type ;
2224import io .streamthoughts .kafka .connect .filepulse .data .TypedStruct ;
2325import io .streamthoughts .kafka .connect .filepulse .data .TypedValue ;
2426import io .streamthoughts .kafka .connect .filepulse .json .DefaultJSONStructConverter ;
2527import io .streamthoughts .kafka .connect .filepulse .reader .RecordsIterable ;
2628import org .apache .kafka .common .config .ConfigDef ;
2729
28- import java .nio .charset .Charset ;
30+ import java .util .Collection ;
31+ import java .util .Collections ;
32+ import java .util .List ;
2933import java .util .Map ;
3034import java .util .Set ;
35+ import java .util .stream .Collectors ;
3136
3237public class JSONFilter extends AbstractMergeRecordFilter <JSONFilter > {
3338
3439 private final DefaultJSONStructConverter converter = new DefaultJSONStructConverter ();
3540
3641 private JSONFilterConfig configs ;
3742
38- private String source ;
39-
40- private String target ;
41-
42- private Charset charset ;
43-
4443 /**
4544 * {@inheritDoc}
4645 */
4746 @ Override
4847 public void configure (final Map <String , ?> props ) {
4948 super .configure (props );
5049 configs = new JSONFilterConfig (props );
51- source = configs .source ();
52- target = configs .target ();
53- charset = configs .charset ();
5450 }
5551
5652 /**
@@ -66,44 +62,95 @@ public ConfigDef configDef() {
6662 */
6763 @ Override
6864 protected RecordsIterable <TypedStruct > apply (final FilterContext context , final TypedStruct record ) {
69- final String value = extractJsonField (checkIsNotNull (record .get (source )));
65+ final String value = extractJsonField (checkIsNotNull (record .get (configs .source ())));
66+ final TypedValue typedValue ;
67+
7068 try {
71- final TypedStruct json = converter .readJson (value );
72- if (target != null ) {
73- record .put (target , json );
74- return RecordsIterable .of (record );
75- }
76- return RecordsIterable .of (json );
69+ typedValue = converter .readJson (value );
7770 } catch (Exception e ) {
7871 throw new FilterException (e .getLocalizedMessage (), e .getCause ());
7972 }
73+
74+ final Type type = typedValue .type ();
75+
76+ if (type != Type .ARRAY && type != Type .STRUCT ) {
77+ throw new FilterException (
78+ "Cannot process JSON value with unsupported type. Expected Array or Object, was " + type );
79+ }
80+
81+ if (type == Type .STRUCT && configs .merge ()) {
82+ return RecordsIterable .of (typedValue .getStruct ());
83+ }
84+
85+ if (type == Type .ARRAY ) {
86+ if (configs .explode ()) {
87+
88+ Collection <?> items = typedValue .getArray ();
89+ ArraySchema arraySchema = (ArraySchema )typedValue .schema ();
90+ Type arrayValueType = arraySchema .valueSchema ().type ();
91+
92+ if (configs .merge ()) {
93+ if (arrayValueType == Type .STRUCT ) {
94+ final List <TypedStruct > records = items
95+ .stream ()
96+ .map (it -> TypedValue .any (it ).getStruct ())
97+ .collect (Collectors .toList ());
98+ return new RecordsIterable <>(records );
99+ }
100+
101+ throw new FilterException (
102+ "Unsupported operation. Cannot merge array value of type '"
103+ + arrayValueType + "' into the top level of the input record" );
104+ }
105+
106+ final List <TypedStruct > records = items
107+ .stream ()
108+ .map (it -> TypedStruct .create ().put (targetField (), TypedValue .of (it , arrayValueType )))
109+ .collect (Collectors .toList ());
110+ return new RecordsIterable <>(records );
111+ }
112+
113+ if (configs .merge ()) {
114+ throw new FilterException (
115+ "Unsupported operation. Cannot merge JSON Array into the top level of the input record" );
116+ }
117+ }
118+
119+ return RecordsIterable .of (TypedStruct .create ().put (targetField (), typedValue ));
80120 }
81121
82122 private String extractJsonField (final TypedValue value ) {
83123 switch (value .type ()) {
84124 case STRING :
85125 return value .getString ();
86126 case BYTES :
87- return new String (value .getBytes (), charset );
127+ return new String (value .getBytes (), configs . charset () );
88128 default :
89129 throw new FilterException (
90- "Invalid field '" + source + "', cannot parse JSON field of type '" + value .type () + "'" );
130+ "Invalid field '" + configs .source () + "', cannot parse JSON field of type '" + value .type () + "'"
131+ );
91132 }
92133 }
93134
94135 private TypedValue checkIsNotNull (final TypedValue value ) {
95136 if (value .isNull ()) {
96137 throw new FilterException (
97- "Invalid field '" + source + "', cannot convert empty value to JSON" );
138+ "Invalid field '" + configs . source () + "', cannot convert empty value to JSON" );
98139 }
99140 return value ;
100141 }
101142
143+ private String targetField () {
144+ return configs .target () != null ? configs .target () : configs .source ();
145+ }
146+
102147 /**
103148 * {@inheritDoc}
104149 */
105150 @ Override
106151 protected Set <String > overwrite () {
107- return configs .overwrite ();
152+ return configs .target () == null && !configs .merge () ?
153+ Collections .singleton (configs .source ())
154+ : configs .overwrite () ;
108155 }
109156}
0 commit comments