Skip to content

Commit 8b1491b

Browse files
committed
feat(plugin): allow to keep leading underscores on field names
1 parent ffdb9ab commit 8b1491b

File tree

6 files changed

+100
-30
lines changed

6 files changed

+100
-30
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileRecord.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,27 @@ SourceRecord toSourceRecord(
6464
final String defaultTopic,
6565
final Integer defaultPartition,
6666
final Function<String, Schema> connectSchemaSupplier,
67-
final boolean connectSchemaMergeEnabled);
67+
final ConnectSchemaMapperOptions options
68+
);
69+
70+
71+
class ConnectSchemaMapperOptions {
72+
73+
private final boolean connectSchemaMergeEnabled;
74+
private final boolean keepSchemaLeadingUnderscore;
75+
76+
public ConnectSchemaMapperOptions(final boolean connectSchemaMergeEnabled,
77+
final boolean keepSchemaLeadingUnderscore) {
78+
this.connectSchemaMergeEnabled = connectSchemaMergeEnabled;
79+
this.keepSchemaLeadingUnderscore = keepSchemaLeadingUnderscore;
80+
}
81+
82+
public boolean isConnectSchemaMergeEnabled() {
83+
return connectSchemaMergeEnabled;
84+
}
85+
86+
public boolean isKeepSchemaLeadingUnderscore() {
87+
return keepSchemaLeadingUnderscore;
88+
}
89+
}
6890
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/TypedFileRecord.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ public SourceRecord toSourceRecord(final Map<String, ?> sourcePartition,
6161
final String defaultTopic,
6262
final Integer defaultPartition,
6363
final Function<String, Schema> connectSchemaSupplier,
64-
final boolean connectSchemaMergeEnabled) {
65-
64+
final ConnectSchemaMapperOptions options) {
65+
mapper.setKeepLeadingUnderscoreCharacters(options.isKeepSchemaLeadingUnderscore());
6666
final TypedStruct value = value();
6767
final Schema valueSchema;
6868
final Schema connectSchema = connectSchemaSupplier.apply(
6969
isNotBlank(internalSourceRecordBuilder.topic()) ? internalSourceRecordBuilder.topic() : defaultTopic
7070
);
71-
if (connectSchemaMergeEnabled && value != null) {
71+
if (options.isConnectSchemaMergeEnabled() && value != null) {
7272
Schema recordValueSchema = value.schema().map(mapper, false);
7373
if (connectSchema != null) {
7474
valueSchema = SchemaMerger.merge(connectSchema, recordValueSchema);

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.source.internal;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
2122
import io.streamthoughts.kafka.connect.filepulse.data.ArraySchema;
2223
import io.streamthoughts.kafka.connect.filepulse.data.DataException;
2324
import io.streamthoughts.kafka.connect.filepulse.data.MapSchema;
@@ -55,14 +56,32 @@ public class ConnectSchemaMapper implements SchemaMapper<Schema>, SchemaMapperWi
5556

5657
private final SchemaContext context = new SchemaContext();
5758

58-
static String normalizeSchemaName(final String name) {
59-
return Arrays
60-
.stream(REGEX.split(name))
59+
private boolean keepLeadingUnderscoreCharacters = false;
60+
61+
@VisibleForTesting
62+
String normalizeSchemaName(final String name) {
63+
String toNormalize = name;
64+
StringBuilder prefix = new StringBuilder();
65+
if (keepLeadingUnderscoreCharacters) {
66+
StringBuilder sb = new StringBuilder(name);
67+
while (sb.length() > 0 && sb.charAt(0) == '_') {
68+
prefix.append("_");
69+
sb.deleteCharAt(0);
70+
}
71+
toNormalize = sb.toString();
72+
73+
}
74+
return prefix + Arrays
75+
.stream(REGEX.split(toNormalize))
6176
.filter(s -> !s.isEmpty())
6277
.map(it -> it.substring(0, 1).toUpperCase() + it.substring(1))
6378
.collect(Collectors.joining());
6479
}
6580

81+
public void setKeepLeadingUnderscoreCharacters(final boolean keepLeadingUnderscoreCharacters) {
82+
this.keepLeadingUnderscoreCharacters = keepLeadingUnderscoreCharacters;
83+
}
84+
6685
/**
6786
* {@inheritDoc}
6887
*/
@@ -118,6 +137,7 @@ public Schema map(final StructSchema schema, final boolean optional) {
118137
}
119138
return context.buildSchemaWithCyclicSchemaWrapper(sb.build());
120139
}
140+
121141
private void mayUpdateSchemaWithName(final io.streamthoughts.kafka.connect.filepulse.data.Schema schema,
122142
final String schemaName) {
123143
if (schema.type() == Type.ARRAY) {
@@ -206,7 +226,7 @@ private static Struct toConnectStruct(final Schema connectSchema, final TypedStr
206226
if (!isOptional) {
207227
throw new DataException(
208228
"Failed to convert record to connect data. " +
209-
"Missing required connectField '" + fieldName + "' for record '" + recordName + "'"
229+
"Missing required connectField '" + fieldName + "' for record '" + recordName + "'"
210230
);
211231
}
212232
continue;
@@ -235,18 +255,18 @@ else if (dataSchemaType.isPrimitive()) {
235255
final boolean isNumber = typed.type().isNumber();
236256
if (schemaType == Schema.Type.STRING) {
237257
typed = typed.as(Type.STRING);
238-
// handle INTEGER/LONG -> DOUBLE
258+
// handle INTEGER/LONG -> DOUBLE
239259
} else if (schemaType == Schema.Type.FLOAT64 && isNumber) {
240260
typed = typed.as(Type.DOUBLE);
241-
// handle INTEGER -> LONG
261+
// handle INTEGER -> LONG
242262
} else if (schemaType == Schema.Type.INT64 && typed.type() == Type.INTEGER) {
243263
typed = typed.as(Type.LONG);
244264
}
245265
} else {
246266
throw new DataException(
247267
"Failed to convert record connectField '" +
248-
recordName + "." + fieldName + "' to connect data. " +
249-
"Types do not match " + schemaType + "<>" + dataSchemaType
268+
recordName + "." + fieldName + "' to connect data. " +
269+
"Types do not match " + schemaType + "<>" + dataSchemaType
250270
);
251271
}
252272
}
@@ -276,8 +296,8 @@ private static Object toConnectObject(final Schema schema, final TypedValue type
276296
return typed.getMap().entrySet()
277297
.stream()
278298
.collect(Collectors.toMap(
279-
e -> toConnectObject(connectKeySchema, TypedValue.of(e.getKey(), keySchema)),
280-
e -> toConnectObject(connectValueSchema, TypedValue.of(e.getValue(), valueSchema))
299+
e -> toConnectObject(connectKeySchema, TypedValue.of(e.getKey(), keySchema)),
300+
e -> toConnectObject(connectValueSchema, TypedValue.of(e.getValue(), valueSchema))
281301
)
282302
);
283303
}

connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapperTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,19 @@ public void should_map_given_type_struct_with_empty_array() {
162162
}
163163

164164
@Test
165-
public void test_normalize_schema_name() {
166-
Assert.assertEquals("Foo", ConnectSchemaMapper.normalizeSchemaName("foo"));
167-
Assert.assertEquals("FooBar", ConnectSchemaMapper.normalizeSchemaName("foo_bar"));
168-
Assert.assertEquals("FooBar", ConnectSchemaMapper.normalizeSchemaName("foo.bar"));
169-
Assert.assertEquals("FooBar", ConnectSchemaMapper.normalizeSchemaName("_foo_bar"));
165+
public void test_normalize_schema_name_given_leading_underscore_false() {
166+
var mapper = new ConnectSchemaMapper();
167+
Assert.assertEquals("Foo", mapper.normalizeSchemaName("foo"));
168+
Assert.assertEquals("FooBar", mapper.normalizeSchemaName("foo_bar"));
169+
Assert.assertEquals("FooBar", mapper.normalizeSchemaName("foo.bar"));
170+
Assert.assertEquals("FooBar", mapper.normalizeSchemaName("__foo_bar"));
171+
}
172+
173+
@Test
174+
public void test_normalize_schema_name_given_leading_underscore_true() {
175+
var mapper = new ConnectSchemaMapper();
176+
mapper.setKeepLeadingUnderscoreCharacters(true);
177+
Assert.assertEquals("__FooBar", mapper.normalizeSchemaName("__foo_bar"));
170178
}
171179

172180
@Test

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,19 @@ public class CommonSourceConfig extends AbstractConfig {
8181
private static final String TASKS_FILE_STATUS_STORAGE_CLASS_DOC = "The FileObjectStateBackingStore class to be used for storing status state of file objects.";
8282

8383
public static final String TASK_PARTITIONER_CLASS_CONFIG = "task.partitioner.class";
84-
private static final String TASK_PARTITIONER_CLASS_DOC = "The TaskPartitioner to be used for partitioning files to tasks";
84+
private static final String TASK_PARTITIONER_CLASS_DOC = "The TaskPartitioner to be used for partitioning files to tasks";
8585

8686
public static final String RECORD_VALUE_SCHEMA_CONFIG = "value.connect.schema";
87-
public static final String RECORD_VALUE_SCHEMA_DOC = "The schema for the record-value";
87+
private static final String RECORD_VALUE_SCHEMA_DOC = "The schema for the record-value";
8888

8989
public static final String RECORD_VALUE_SCHEMA_MERGE_ENABLE_CONFIG = "merge.value.connect.schemas";
90-
public static final String RECORD_VALUE_SCHEMA_MERGE_ENABLE_DOC = "Specify if schemas deriving from record-values should be recursively merged. " +
90+
private static final String RECORD_VALUE_SCHEMA_MERGE_ENABLE_DOC = "Specify if schemas deriving from record-values should be recursively merged. " +
9191
"If set to true, then schemas deriving from a record will be merged with the schema of the last produced record. " +
9292
"If `value.connect.schema` is set, then the provided schema will be merged with the schema deriving from the first generated record.";
9393

94+
private static final String CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG = "connect.schema.keep.leading.underscores.on.record.name";
95+
private static final String CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_DOC = "Specify if leading underscores should be kept when generating schema record name.";
96+
9497
/**
9598
* Creates a new {@link CommonSourceConfig} instance.
9699
*/
@@ -107,7 +110,6 @@ public static ConfigDef getConfigDev() {
107110
ConfigDef.Importance.HIGH,
108111
FS_LISTING_CLASS_DOC
109112
)
110-
111113
.define(
112114
FS_LISTING_FILTERS_CONFIG,
113115
ConfigDef.Type.LIST,
@@ -233,6 +235,17 @@ public static ConfigDef getConfigDev() {
233235
groupCounter++,
234236
ConfigDef.Width.NONE,
235237
TASKS_FILE_PROCESSING_ORDER_BY_CONFIG
238+
)
239+
.define(
240+
CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG,
241+
ConfigDef.Type.BOOLEAN,
242+
false,
243+
ConfigDef.Importance.LOW,
244+
CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_DOC,
245+
FILTERS_GROUP,
246+
groupCounter++,
247+
ConfigDef.Width.NONE,
248+
CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG
236249
);
237250
}
238251

@@ -279,6 +292,10 @@ public boolean isValueConnectSchemaMergeEnabled() {
279292
return getBoolean(RECORD_VALUE_SCHEMA_MERGE_ENABLE_CONFIG);
280293
}
281294

295+
public boolean isSchemaKeepLeadingUnderscoreOnFieldName() {
296+
return getBoolean(CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG);
297+
}
298+
282299
private Schema readSchema(final String key) {
283300
final String schema = this.getString(key);
284301

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,16 @@ public List<SourceRecord> poll() throws InterruptedException {
165165
// if no file is immediately available. In this case, this method should
166166
// be blocked before returning.
167167
if (!consumer.hasNext() &&
168-
consecutiveWaits.checkAndDecrement()) {
168+
consecutiveWaits.checkAndDecrement()) {
169169
// Check if the SourceTask is still running to
170170
// return immediately instead of waiting
171171
if (running.get()) busyWait();
172172
continue;
173173
}
174174
} else {
175175
LOG.info(
176-
"Completed all object files. FilePulse source task is transitioning to " +
177-
"IDLE state while waiting for new reconfiguration request from source connector."
176+
"Completed all object files. FilePulse source task is transitioning to " +
177+
"IDLE state while waiting for new reconfiguration request from source connector."
178178
);
179179
running.set(false);
180180

@@ -283,7 +283,10 @@ private SourceRecord buildSourceRecord(final FileContext context,
283283
defaultTopic,
284284
null,
285285
valueSchemas::get,
286-
taskConfig.isValueConnectSchemaMergeEnabled()
286+
new FileRecord.ConnectSchemaMapperOptions(
287+
taskConfig.isValueConnectSchemaMergeEnabled(),
288+
taskConfig.isSchemaKeepLeadingUnderscoreOnFieldName()
289+
)
287290
);
288291

289292
if (taskConfig.isValueConnectSchemaMergeEnabled()) {
@@ -294,8 +297,8 @@ private SourceRecord buildSourceRecord(final FileContext context,
294297

295298
} catch (final Throwable t) {
296299
throw new ConnectFilePulseException(
297-
"Failed to convert data into connect record: '" + context.metadata().uri() + "'",
298-
t
300+
"Failed to convert data into connect record: '" + context.metadata().uri() + "'",
301+
t
299302
);
300303
}
301304
}
@@ -332,7 +335,7 @@ private void closeResources() {
332335
if (consumer != null) {
333336
try {
334337
consumer.close();
335-
} catch (final Throwable t) {
338+
} catch (final Throwable t) {
336339
LOG.warn("Failed to close FileRecordsPollingConsumer. Error: {}", t.getMessage());
337340
}
338341
}

0 commit comments

Comments
 (0)