Skip to content

Commit b308e70

Browse files
committed
fix(api): fix connect.value.schema is not used when sink topic is computed from record
1 parent 6aa8123 commit b308e70

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/TypedFileRecord.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.connect.source.SourceRecord;
2929

3030
import java.util.Map;
31+
import java.util.Optional;
3132
import java.util.function.Function;
3233

3334
import static io.streamthoughts.kafka.connect.filepulse.internal.StringUtils.isNotBlank;
@@ -64,15 +65,25 @@ public SourceRecord toSourceRecord(final Map<String, ?> sourcePartition,
6465
final ConnectSchemaMapperOptions options) {
6566
mapper.setKeepLeadingUnderscoreCharacters(options.isKeepSchemaLeadingUnderscore());
6667
final TypedStruct value = value();
68+
69+
final String targetTopic = isNotBlank(internalSourceRecordBuilder.topic()) ?
70+
internalSourceRecordBuilder.topic() :
71+
defaultTopic;
72+
73+
final Schema connectSchema = Optional
74+
.ofNullable(connectSchemaSupplier.apply(targetTopic))
75+
.orElse(connectSchemaSupplier.apply(defaultTopic));
76+
6777
final Schema valueSchema;
68-
final Schema connectSchema = connectSchemaSupplier.apply(
69-
isNotBlank(internalSourceRecordBuilder.topic()) ? internalSourceRecordBuilder.topic() : defaultTopic
70-
);
7178
if (options.isConnectSchemaMergeEnabled() && value != null) {
79+
// Convert dynamic StrutSchema to static Connect Schema
7280
Schema recordValueSchema = value.schema().map(mapper, false);
7381
if (connectSchema != null) {
82+
// Merge Record Connect Schema with either the previous one or the one
83+
// passed through the connector's config.
7484
valueSchema = SchemaMerger.merge(connectSchema, recordValueSchema);
7585
} else {
86+
// Use the Record Connect Schema
7687
valueSchema = recordValueSchema;
7788
}
7889
} else {

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private static Struct toConnectStruct(final Schema connectSchema, final TypedStr
226226
if (!isOptional) {
227227
throw new DataException(
228228
"Failed to convert record to connect data. " +
229-
"Missing required connectField '" + fieldName + "' for record '" + recordName + "'"
229+
"Missing required field '" + fieldName + "' for record '" + recordName + "'"
230230
);
231231
}
232232
continue;
@@ -243,7 +243,7 @@ private static Struct toConnectStruct(final Schema connectSchema, final TypedStr
243243
Schema.Type arrayValueType = connectFieldSchema.valueSchema().type();
244244
if (!arrayValueType.equals(dataSchemaType)) {
245245
throw new DataException(
246-
"Failed to convert record connectField '" +
246+
"Failed to convert record field '" +
247247
recordName + "." + fieldName + "' to connect data. " +
248248
"Types do not match Array[" + arrayValueType + "]<>Array[" + dataSchemaType + "]"
249249
);
@@ -264,9 +264,9 @@ else if (dataSchemaType.isPrimitive()) {
264264
}
265265
} else {
266266
throw new DataException(
267-
"Failed to convert record connectField '" +
268-
recordName + "." + fieldName + "' to connect data. " +
269-
"Types do not match " + schemaType + "<>" + dataSchemaType
267+
"Failed to convert record field '" +
268+
recordName + "." + fieldName + "' to connect data. " +
269+
"Types do not match " + schemaType + "<>" + dataSchemaType
270270
);
271271
}
272272
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,8 @@ public void start(final Map<String, String> props) {
100100
taskConfig = new SourceTaskConfig(configProperties);
101101
connectorGroupName = props.get(CONNECT_NAME_CONFIG);
102102
offsetPolicy = taskConfig.getSourceOffsetPolicy();
103-
valueSchemas.put(taskConfig.topic(), taskConfig.getValueConnectSchema());
104103
defaultTopic = taskConfig.topic();
105-
104+
valueSchemas.put(defaultTopic, taskConfig.getValueConnectSchema());
106105
try {
107106
sharedStore = new StateBackingStoreAccess(
108107
connectorGroupName,

0 commit comments

Comments
 (0)