@@ -458,14 +458,14 @@ private void testTimestampUnitResolution0(String mode) {
458458 }
459459
460460 @ Test
461- public void testTimestampSMT_parseMicroseconds_schemaLess () {
461+ public void testTimestampSMT_parseTimestamp_schemaLess () {
462462 connect .kafka ().createTopic (topicName , 1 );
463463 Map <String , String > props = baseConnectorProps (topicName );
464464 props .put ("value.converter.schemas.enable" , "false" );
465465 props .put (QuestDBSinkConnectorConfig .DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG , "born" );
466466 props .put (QuestDBSinkConnectorConfig .INCLUDE_KEY_CONFIG , "false" );
467467
468- String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSSSSS z" ;
468+ String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSS z" ;
469469 props .put ("transforms" , "Timestamp-born,Timestamp-death" );
470470 props .put ("transforms.Timestamp-born.type" , "org.apache.kafka.connect.transforms.TimestampConverter$Value" );
471471 props .put ("transforms.Timestamp-born.field" , "born" );
@@ -485,8 +485,8 @@ public void testTimestampSMT_parseMicroseconds_schemaLess() {
485485 "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)" ,
486486 QuestDBUtils .Endpoint .EXEC );
487487
488- String birthTimestamp = "1985-08-02 16:41:55.402095 UTC" ;
489- String deadTimestamp = "2023-08-02 16:41:55.402095 UTC" ;
488+ String birthTimestamp = "1985-08-02 16:41:55.402 UTC" ;
489+ String deadTimestamp = "2023-08-02 16:41:55.402 UTC" ;
490490 connect .kafka ().produce (topicName , "foo" ,
491491 "{\" firstname\" :\" John\" "
492492 + ",\" lastname\" :\" Doe\" "
@@ -495,18 +495,18 @@ public void testTimestampSMT_parseMicroseconds_schemaLess() {
495495 );
496496
497497 QuestDBUtils .assertSqlEventually (questDBContainer , "\" firstname\" ,\" lastname\" ,\" death\" ,\" born\" \r \n " +
498- "\" John\" ,\" Doe\" ,\" 2023-08-02T16:48:37.095000Z \" ,\" 1985-08-02T16:48:37.095000Z \" \r \n " ,
498+ "\" John\" ,\" Doe\" ,\" 2023-08-02T16:41:55.402000Z \" ,\" 1985-08-02T16:41:55.402000Z \" \r \n " ,
499499 "select * from " + topicName );
500500 }
501501
502502 @ Test
503- public void testTimestampSMT_parseMicroseconds_withSchema () {
503+ public void testTimestampSMT_parseTimestamp_withSchema () {
504504 connect .kafka ().createTopic (topicName , 1 );
505505 Map <String , String > props = baseConnectorProps (topicName );
506506 props .put (QuestDBSinkConnectorConfig .DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG , "born" );
507507 props .put (QuestDBSinkConnectorConfig .INCLUDE_KEY_CONFIG , "false" );
508508
509- String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSSSSS z" ;
509+ String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSS z" ;
510510 props .put ("transforms" , "Timestamp-born,Timestamp-death" );
511511 props .put ("transforms.Timestamp-born.type" , "org.apache.kafka.connect.transforms.TimestampConverter$Value" );
512512 props .put ("transforms.Timestamp-born.field" , "born" );
@@ -530,14 +530,14 @@ public void testTimestampSMT_parseMicroseconds_withSchema() {
530530 Struct struct = new Struct (schema )
531531 .put ("firstname" , "John" )
532532 .put ("lastname" , "Doe" )
533- .put ("born" , "1985-08-02 16:41:55.402095 UTC" )
534- .put ("death" , "2023-08-02 16:41:55.402095 UTC" );
533+ .put ("born" , "1985-08-02 16:41:55.402 UTC" )
534+ .put ("death" , "2023-08-02 16:41:55.402 UTC" );
535535
536536
537537 connect .kafka ().produce (topicName , "key" , new String (converter .fromConnectData (topicName , schema , struct )));
538538
539539 QuestDBUtils .assertSqlEventually (questDBContainer , "\" firstname\" ,\" lastname\" ,\" death\" ,\" timestamp\" \r \n " +
540- "\" John\" ,\" Doe\" ,\" 2023-08-02T16:48:37.095000Z \" ,\" 1985-08-02T16:48:37.095000Z \" \r \n " ,
540+ "\" John\" ,\" Doe\" ,\" 2023-08-02T16:41:55.402000Z \" ,\" 1985-08-02T16:41:55.402000Z \" \r \n " ,
541541 "select * from " + topicName );
542542 }
543543
@@ -740,6 +740,38 @@ public void testPrimitiveKey() {
740740 "select firstname, lastname, age, key from " + topicName );
741741 }
742742
743+ @ Test
744+ public void testParsingStringTimestamp () {
745+ connect .kafka ().createTopic (topicName , 1 );
746+ Map <String , String > props = baseConnectorProps (topicName );
747+ props .put ("value.converter.schemas.enable" , "false" );
748+ props .put (QuestDBSinkConnectorConfig .DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG , "born" );
749+ props .put (QuestDBSinkConnectorConfig .INCLUDE_KEY_CONFIG , "false" );
750+ props .put (QuestDBSinkConnectorConfig .TIMESTAMP_FORMAT , "yyyy-MM-dd HH:mm:ss.SSSUUU z" );
751+ props .put (QuestDBSinkConnectorConfig .TIMESTAMP_STRING_FIELDS , "born,death" );
752+
753+ connect .configureConnector (CONNECTOR_NAME , props );
754+ assertConnectorTaskRunningEventually ();
755+
756+ QuestDBUtils .assertSql (questDBContainer ,
757+ "{\" ddl\" :\" OK\" }\n " ,
758+ "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)" ,
759+ QuestDBUtils .Endpoint .EXEC );
760+
761+ String birthTimestamp = "1985-08-02 16:41:55.402095 UTC" ;
762+ String deadTimestamp = "2023-08-02 16:41:55.402095 UTC" ;
763+ connect .kafka ().produce (topicName , "foo" ,
764+ "{\" firstname\" :\" John\" "
765+ + ",\" lastname\" :\" Doe\" "
766+ + ",\" death\" :\" " + deadTimestamp + "\" "
767+ + ",\" born\" :\" " + birthTimestamp + "\" }"
768+ );
769+
770+ QuestDBUtils .assertSqlEventually (questDBContainer , "\" firstname\" ,\" lastname\" ,\" death\" ,\" born\" \r \n " +
771+ "\" John\" ,\" Doe\" ,\" 2023-08-02T16:41:55.402095Z\" ,\" 1985-08-02T16:41:55.402095Z\" \r \n " ,
772+ "select * from " + topicName );
773+ }
774+
743775 @ Test
744776 public void testCustomPrefixWithPrimitiveKeyAndValues () {
745777 connect .kafka ().createTopic (topicName , 1 );
0 commit comments