22
33import io .questdb .client .Sender ;
44import io .questdb .cutlass .line .LineSenderException ;
5+ import io .questdb .std .NumericException ;
6+ import io .questdb .std .datetime .DateFormat ;
57import io .questdb .std .datetime .microtime .Timestamps ;
8+ import io .questdb .std .datetime .millitime .DateFormatUtils ;
69import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
710import org .apache .kafka .common .TopicPartition ;
811import org .apache .kafka .connect .data .Date ;
1922import org .slf4j .Logger ;
2023import org .slf4j .LoggerFactory ;
2124
22- import java .time .format .DateTimeFormatter ;
23- import java .time .temporal .ChronoField ;
24- import java .time .temporal .TemporalAccessor ;
2525import java .util .Collection ;
2626import java .util .Collections ;
2727import java .util .HashSet ;
@@ -45,7 +45,7 @@ public final class QuestDBSinkTask extends SinkTask {
4545 private Set <String > stringTimestampColumns ;
4646 private int remainingRetries ;
4747 private long batchesSinceLastError = 0 ;
48- private DateTimeFormatter timestampParser ;
48+ private DateFormat dataFormat ;
4949
5050 @ Override
5151 public String version () {
@@ -61,7 +61,7 @@ public void start(Map<String, String> map) {
6161 for (String symbolColumn : timestampStringFields .split ("," )) {
6262 stringTimestampColumns .add (symbolColumn .trim ());
6363 }
64- this . timestampParser = DateTimeFormatter . ofPattern (config .getDefaultTimestampFormat ());
64+ dataFormat = TimestampParserCompiler . compilePattern (config .getDefaultTimestampFormat ());
6565 } else {
6666 stringTimestampColumns = Collections .emptySet ();
6767 }
@@ -251,7 +251,7 @@ private long resolveDesignatedTimestampColumnValue(Object value, Schema schema)
251251 }
252252 if (value instanceof String ) {
253253 log .debug ("Timestamp column value is a string" );
254- return parseTimestamp ((String ) value , TimeUnit . NANOSECONDS ) ;
254+ return parseToMicros ((String ) value ) * 1000 ;
255255 }
256256 if (!(value instanceof Long )) {
257257 throw new ConnectException ("Unsupported timestamp column type: " + value .getClass ());
@@ -277,7 +277,7 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
277277 if (value instanceof String ) {
278278 String stringVal = (String ) value ;
279279 if (stringTimestampColumns .contains (actualName )) {
280- long timestamp = parseTimestamp (stringVal , TimeUnit . MICROSECONDS );
280+ long timestamp = parseToMicros (stringVal );
281281 sender .timestampColumn (actualName , timestamp );
282282 } else {
283283 sender .stringColumn (actualName , stringVal );
@@ -310,11 +310,12 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
310310 }
311311 }
312312
313- private long parseTimestamp (String timestamp , TimeUnit unit ) {
314- TemporalAccessor accessor = timestampParser .parse (timestamp );
315- long seconds = accessor .getLong (ChronoField .INSTANT_SECONDS );
316- int nanos = accessor .get (ChronoField .NANO_OF_SECOND );
317- return unit .convert (seconds , TimeUnit .SECONDS ) + unit .convert (nanos , TimeUnit .NANOSECONDS );
313+ private long parseToMicros (String timestamp ) {
314+ try {
315+ return dataFormat .parse (timestamp , DateFormatUtils .enLocale );
316+ } catch (NumericException e ) {
317+ throw new ConnectException ("Cannot parse timestamp: " + timestamp , e );
318+ }
318319 }
319320
320321 private static String sanitizeName (String name ) {
@@ -349,7 +350,7 @@ private boolean tryWritePhysicalTypeFromSchema(String name, Schema schema, Objec
349350 case STRING :
350351 String s = (String ) value ;
351352 if (stringTimestampColumns .contains (primitiveTypesName )) {
352- long timestamp = parseTimestamp ( s , TimeUnit . MICROSECONDS );
353+ long timestamp = parseToMicros ( s );
353354 sender .timestampColumn (sanitizedName , timestamp );
354355 } else {
355356 sender .stringColumn (sanitizedName , s );
0 commit comments