1717package io .cdap .delta .plugin .common ;
1818
1919import io .cdap .cdap .api .data .format .StructuredRecord ;
20+ import io .cdap .cdap .api .data .format .UnexpectedFormatException ;
2021import io .cdap .cdap .api .data .schema .Schema ;
2122import io .cdap .delta .api .SourceColumn ;
2223import io .debezium .jdbc .JdbcValueConverters ;
3738import java .math .BigDecimal ;
3839import java .time .Instant ;
3940import java .time .LocalDate ;
41+ import java .time .LocalDateTime ;
4042import java .time .LocalTime ;
4143import java .time .ZoneOffset ;
4244import java .time .ZonedDateTime ;
45+ import java .time .format .DateTimeParseException ;
4346import java .util .ArrayList ;
4447import java .util .List ;
4548import java .util .Map ;
@@ -157,6 +160,7 @@ public static StructuredRecord convert(Struct struct) {
157160 for (Schema .Field field : schema .getFields ()) {
158161 String fieldName = field .getName ();
159162 Field debeziumField = struct .schema ().field (fieldName );
163+ String debeziumSchemaName = debeziumField .schema ().name ();
160164 Object val = convert (debeziumField .schema (), struct .get (fieldName ));
161165 Schema fieldSchema = field .getSchema ();
162166 fieldSchema = fieldSchema .isNullable () ? fieldSchema .getNonNullable () : fieldSchema ;
@@ -168,6 +172,34 @@ public static StructuredRecord convert(Struct struct) {
168172 case DATE :
169173 builder .setDate (fieldName , LocalDate .ofEpochDay ((int ) val ));
170174 break ;
175+ case DATETIME :
176+ long value = (long ) val ;
177+ try {
178+ LocalDateTime localDateTime ;
179+ if (NanoTimestamp .SCHEMA_NAME .equals (debeziumSchemaName )) {
180+ // DATETIME2(7) from SQL Server is represented as io.debezium.time.NanoTimestamp
181+ // which is the number of nanoseconds past the epoch, and does not include timezone information.
182+ localDateTime = LocalDateTime .ofInstant (Instant .ofEpochSecond (0L , value ), ZoneOffset .UTC );
183+ } else if (MicroTimestamp .SCHEMA_NAME .equals (debeziumSchemaName )) {
184+ // DATETIME2(4), DATETIME2(5), DATETIME2(6) from SQL Server and mysql are represented as
185+ // io.debezium.time.MicroTimestamp, which is the number of microseconds past the epoch, and does
186+ // not include timezone information.
187+ localDateTime
188+ = LocalDateTime .ofInstant (Instant .ofEpochSecond (0L , TimeUnit .MICROSECONDS .toNanos (value )),
189+ ZoneOffset .UTC );
190+ } else {
191+ // DATETIME, SMALLDATETIME, DATETIME2(0), DATETIME2(1), DATETIME2(2), DATETIME2(3) from SQL Server
192+ // and mysql are represented as io.debezium.time.Timestamp, which is the number of milliseconds
193+ // past the epoch, and does not include timezone information
194+ localDateTime = LocalDateTime .ofInstant (Instant .ofEpochMilli (value ), ZoneOffset .UTC );
195+ }
196+ builder .setDateTime (fieldName , localDateTime );
197+ } catch (DateTimeParseException exception ) {
198+ throw new UnexpectedFormatException (
199+ String .format ("Field '%s' of type '%s' with value '%s' is not in ISO-8601 format." ,
200+ field .getName (), debeziumSchemaName , val ), exception );
201+ }
202+ break ;
171203 case TIMESTAMP_MILLIS :
172204 builder .setTimestamp (fieldName , getZonedDateTime ((long ) val , TimeUnit .MILLISECONDS ));
173205 break ;
@@ -178,7 +210,17 @@ public static StructuredRecord convert(Struct struct) {
178210 builder .setTime (fieldName , LocalTime .ofNanoOfDay (TimeUnit .MILLISECONDS .toNanos ((int ) val )));
179211 break ;
180212 case TIME_MICROS :
181- builder .setTime (fieldName , LocalTime .ofNanoOfDay (TimeUnit .MICROSECONDS .toNanos ((long ) val )));
213+ LocalTime localTime ;
214+ if (NanoTime .SCHEMA_NAME .equals (debeziumSchemaName )) {
215+ // TIME(7) from SQL server represents the number of nanoseconds past midnight, and does not include
216+ // timezone information.
217+ localTime = LocalTime .ofNanoOfDay ((long ) val );
218+ } else {
219+ // TIME(4), TIME(5), TIME(6) which represents the number of microseconds past midnight,
220+ // and does not include timezone information.
221+ localTime = LocalTime .ofNanoOfDay (TimeUnit .MICROSECONDS .toNanos ((long ) val ));
222+ }
223+ builder .setTime (fieldName , localTime );
182224 break ;
183225 case DECIMAL :
184226 builder .setDecimal (fieldName , (BigDecimal ) val );
@@ -278,10 +320,9 @@ public static Schema convert(org.apache.kafka.connect.data.Schema schema) {
278320 NanoTime .SCHEMA_NAME .equals (schema .name ())) {
279321 converted = Schema .of (Schema .LogicalType .TIME_MICROS );
280322 } else if (MicroTimestamp .SCHEMA_NAME .equals (schema .name ()) ||
281- NanoTimestamp .SCHEMA_NAME .equals (schema .name ())) {
282- converted = Schema .of (Schema .LogicalType .TIMESTAMP_MICROS );
283- } else if (Timestamp .SCHEMA_NAME .equals (schema .name ())) {
284- converted = Schema .of (Schema .LogicalType .TIMESTAMP_MILLIS );
323+ NanoTimestamp .SCHEMA_NAME .equals (schema .name ()) ||
324+ Timestamp .SCHEMA_NAME .equals (schema .name ())) {
325+ converted = Schema .of (Schema .LogicalType .DATETIME );
285326 } else {
286327 converted = Schema .of (Schema .Type .LONG );
287328 }
0 commit comments