Skip to content

Commit e64ccbf

Browse files
committed
feat(api): add method set schema namespace and doc
1 parent 3a153d4 commit e64ccbf

File tree

3 files changed

+80
-4
lines changed

3 files changed

+80
-4
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/StructSchema.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,26 @@ public class StructSchema implements Schema, Iterable<TypedField> {
3737

3838
private String name;
3939

40+
private String namespace;
41+
42+
private String doc;
43+
4044
/**
4145
* Creates a new {@link StructSchema} instance.
4246
*/
4347
public StructSchema() {
4448
this(Collections.emptyList(), null);
4549
}
4650

51+
/**
52+
* Creates a new {@link StructSchema} instance.
53+
*/
54+
public StructSchema(final StructSchema schema) {
55+
this(schema.fields(), schema.name);
56+
this.namespace = schema.namespace;
57+
this.doc = schema.doc;
58+
}
59+
4760
/**
4861
* Creates a new {@link StructSchema} instance.
4962
*
@@ -140,15 +153,67 @@ public Type type() {
140153
return this.type;
141154
}
142155

156+
/**
157+
* Gets the name for this schema.
158+
*
159+
* @return the schema name.
160+
*/
143161
public String name() {
144162
return this.name;
145163
}
146164

165+
/**
166+
* Sets the name for this schema.
167+
* @param name the schema name.
168+
*
169+
* @return {@code this}
170+
*/
147171
public StructSchema name(final String name) {
148172
this.name = name;
149173
return this;
150174
}
151175

176+
/**
177+
* Gets the namespace for this schema.
178+
*
179+
* @return the schema namespace.
180+
*/
181+
public String namespace() {
182+
return namespace;
183+
}
184+
185+
/**
186+
* Sets the namespace for this schema.
187+
*
188+
* @param namespace the namespace.
189+
*
190+
* @return {@code this}
191+
*/
192+
public StructSchema namespace(final String namespace) {
193+
this.namespace = namespace;
194+
return this;
195+
}
196+
197+
/**
198+
* Gets the doc for this schema.
199+
*
200+
* @return the doc.
201+
*/
202+
public String doc() {
203+
return this.doc;
204+
}
205+
206+
/**
207+
* Sets the doc for this schema.
208+
* @param doc the schema doc.
209+
*
210+
* @return {@code this}
211+
*/
212+
public StructSchema doc(final String doc) {
213+
this.doc = doc;
214+
return this;
215+
}
216+
152217
/**
153218
* {@inheritDoc}
154219
*/

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/TypedValue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public static TypedValue float32(final Float value) {
126126
*/
127127
public static TypedValue struct(final TypedStruct value) {
128128
return new TypedValue(
129-
new StructSchema(value.schema().fields(), value.schema().name()),
129+
new StructSchema(value.schema()),
130130
value
131131
);
132132
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/internal/ConnectSchemaMapper.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,23 @@ public Schema map(final ArraySchema schema) {
7070
@Override
7171
public Schema map(final StructSchema schema) {
7272
SchemaBuilder sb = SchemaBuilder.struct();
73+
74+
String schemaName = schema.name();
75+
if (schemaName != null) {
76+
if (schema.namespace() != null) {
77+
schemaName = schema.namespace() + "." + schemaName;
78+
}
79+
sb.name(schemaName);
80+
}
81+
82+
if (schema.doc() != null) {
83+
sb.doc(schema.doc());
84+
}
85+
7386
for(final TypedField field : schema) {
7487
sb.field(field.name(), field.schema().map(this)).optional();
7588
}
76-
if (schema.name() != null) {
77-
sb.name(schema.name());
78-
}
89+
7990
return sb.build();
8091
}
8192

0 commit comments

Comments
 (0)