Skip to content

Commit e3792ce

Browse files
authored
feat: support for HTTP transport (#13)
QuestDB ILP client supports HTTP transport in versions 7.4.0+ thus the client dependency version is updated too. Configuration The connector now supports client configuration strings as documented in the client documentation. Set the client configuration string under the key client.conf.string. Example: ```json { "name": "my-connector", "config": { "tasks.max": "4", "connector.class": "io.questdb.kafka.QuestDBSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "topics": "mytopic", "value.converter.schemas.enable": "false", "timestamp.field.name": "ts", "timestamp.units": "nanos", "client.conf.string": "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=1200000;retry_timeout=60000;" } } ``` The old client configuration keys (host, username, token, tls, etc) are considered deprecated and should not be used. They cannot be used together with the client configuration string. Kafka-specific configuration options (timestamp.units, topics, etc.) are not part of the client configuration string and still have their own keys. See the example above. Advantages of the HTTP transport The HTTP transport implements proper error reporting, automatic retries for recoverable errors and simpler configuration. When combined with QuestDB server deduplications it allows Exactly-Once semantics. Known Issues Due to a bug in QuestDB client 7.4.0 it's recommended to disable interval-based flushes. See questdb/questdb#4372 for details. This will be resolved in the next version.
1 parent 164e80e commit e3792ce

File tree

25 files changed

+497
-469
lines changed

25 files changed

+497
-469
lines changed

connector/src/main/java/io/questdb/kafka/BufferingSender.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import io.questdb.std.BoolList;
55
import io.questdb.std.LongList;
66

7+
import java.time.Instant;
8+
import java.time.temporal.ChronoUnit;
79
import java.util.ArrayList;
810
import java.util.HashSet;
911
import java.util.List;
@@ -96,17 +98,22 @@ public Sender boolColumn(CharSequence name, boolean value) {
9698
}
9799

98100
@Override
99-
public Sender timestampColumn(CharSequence name, long value) {
101+
public Sender timestampColumn(CharSequence name, long value, ChronoUnit unit) {
100102
if (symbolColumns.contains(name)) {
101103
symbolColumnNames.add(name);
102104
symbolColumnValues.add(String.valueOf(value));
103105
} else {
104106
timestampNames.add(name);
105-
timestampValues.add(value);
107+
timestampValues.add(unitToMicros(value, unit));
106108
}
107109
return this;
108110
}
109111

112+
@Override
113+
public Sender timestampColumn(CharSequence charSequence, Instant instant) {
114+
throw new UnsupportedOperationException("Not implemented");
115+
}
116+
110117
@Override
111118
public Sender symbol(CharSequence name, CharSequence value) {
112119
symbolColumnNames.add(name);
@@ -164,16 +171,36 @@ private void transferFields() {
164171
for (int i = 0, n = timestampNames.size(); i < n; i++) {
165172
CharSequence fieldName = timestampNames.get(i);
166173
long fieldValue = timestampValues.get(i);
167-
sender.timestampColumn(fieldName, fieldValue);
174+
sender.timestampColumn(fieldName, fieldValue, ChronoUnit.MICROS);
168175
}
169176
timestampNames.clear();
170177
timestampValues.clear();
171178
}
172179

180+
private static long unitToMicros(long value, ChronoUnit unit) {
181+
switch (unit) {
182+
case NANOS:
183+
return value / 1000L;
184+
case MICROS:
185+
return value;
186+
case MILLIS:
187+
return value * 1000L;
188+
case SECONDS:
189+
return value * 1_000_000L;
190+
default:
191+
throw new IllegalArgumentException("Unsupported unit: " + unit);
192+
}
193+
}
194+
173195
@Override
174-
public void at(long timestamp) {
196+
public void at(long timestamp, ChronoUnit unit) {
175197
transferFields();
176-
sender.at(timestamp);
198+
sender.at(timestamp, unit);
199+
}
200+
201+
@Override
202+
public void at(Instant instant) {
203+
throw new UnsupportedOperationException();
177204
}
178205

179206
@Override

connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java

Lines changed: 0 additions & 122 deletions
This file was deleted.

connector/src/main/java/io/questdb/kafka/OffsetTracker.java

Lines changed: 0 additions & 20 deletions
This file was deleted.

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,46 @@ public ConfigDef config() {
4949
public Config validate(Map<String, String> connectorConfigs) {
5050
String s = connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG);
5151
if (Boolean.parseBoolean(s) && connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG) != null) {
52-
throw new IllegalArgumentException("Cannot use " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG
53-
+ " with " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +". These options are mutually exclusive.");
52+
throw new IllegalArgumentException("Cannot use '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG
53+
+ "' with '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +"'. These options are mutually exclusive.");
5454
}
55+
56+
validateClientConfiguration(connectorConfigs);
5557
return super.validate(connectorConfigs);
5658
}
59+
60+
private static void validateClientConfiguration(Map<String, String> connectorConfigs) {
61+
String host = connectorConfigs.get(QuestDBSinkConnectorConfig.HOST_CONFIG);
62+
String confString = connectorConfigs.get(QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG);
63+
String envConfString = System.getenv("QDB_CLIENT_CONF");
64+
65+
// cannot set client configuration string via both explicit config and environment variable
66+
if (confString != null && envConfString != null) {
67+
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or QDB_CLIENT_CONF environment variable must be set. They cannot be used together.");
68+
}
69+
70+
if (confString == null && envConfString == null) {
71+
if (host == null) {
72+
throw new IllegalArgumentException("Either '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' must be set.");
73+
}
74+
return; // configuration string is not used, nothing else to validate
75+
}
76+
77+
// configuration string is used, let's validate no other client configuration is set
78+
if (host != null) {
79+
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
80+
}
81+
if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS) != null) {
82+
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
83+
}
84+
if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG) != null) {
85+
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
86+
}
87+
if (connectorConfigs.get(QuestDBSinkConnectorConfig.TOKEN) != null) {
88+
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TOKEN + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
89+
}
90+
if (connectorConfigs.get(QuestDBSinkConnectorConfig.USERNAME) != null) {
91+
throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.USERNAME + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set.");
92+
}
93+
}
5794
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
6161
public static final String TLS_VALIDATION_MODE_CONFIG = "tls.validation.mode";
6262
public static final String TLS_VALIDATION_MODE_DOC = "TLS validation mode. Possible values: default, insecure";
6363

64+
public static final String CONFIGURATION_STRING_CONFIG = "client.conf.string";
65+
public static final String CONFIGURATION_STRING_DOC = "Configuration string for QuestDB client";
66+
6467
public static final String RETRY_BACKOFF_MS = "retry.backoff.ms";
6568
private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made";
6669

@@ -70,15 +73,6 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
7073
public static final String TIMESTAMP_FORMAT = "timestamp.string.format";
7174
private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields";
7275

73-
public static final String DEDUPLICATION_REWIND_CONFIG = "dedup.rewind.offset";
74-
private static final String DEDUPLICATION_REWIND_DOC = "Rewind offset for deduplication. " +
75-
"On failure, the connector will rewind the offset by this amount and retry. This is designed to work in concert with QuestDB " +
76-
"deduplication feature. The rewind offset must be greater than or equal to the maximum number of records that can lost in the event of a failure. " +
77-
"If the rewind is too small, some events might be missing from QuestDB. If the rewind is too large, the connector will be slower to recover " +
78-
"as it will have to reprocess a large number of records and QuestDB will have to do extra work with deduplication. If you are testing this " +
79-
"feature for the first time then 150000 is a good starting point.";
80-
81-
8276
private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ";
8377

8478
public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
@@ -91,7 +85,7 @@ public QuestDBSinkConnectorConfig(Map<String, String> parsedConfig) {
9185

9286
public static ConfigDef conf() {
9387
return new ConfigDef()
94-
.define(HOST_CONFIG, Type.STRING, Importance.HIGH, HOST_DOC)
88+
.define(HOST_CONFIG, Type.STRING, null, Importance.HIGH, HOST_DOC)
9589
.define(TABLE_CONFIG, Type.STRING, null, TablenameValidator.INSTANCE, Importance.HIGH, TABLE_DOC)
9690
.define(KEY_PREFIX_CONFIG, Type.STRING, "key", Importance.MEDIUM, KEY_PREFIX_DOC)
9791
.define(VALUE_PREFIX_CONFIG, Type.STRING, "", Importance.MEDIUM, VALUE_PREFIX_DOC)
@@ -109,12 +103,12 @@ public static ConfigDef conf() {
109103
.define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
110104
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
111105
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
112-
.define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC)
113-
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC);
106+
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
107+
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC);
114108
}
115109

116-
public long getDeduplicationRewindOffset() {
117-
return getLong(DEDUPLICATION_REWIND_CONFIG);
110+
public Password getConfigurationString() {
111+
return getPassword(CONFIGURATION_STRING_CONFIG);
118112
}
119113

120114
public String getTlsValidationMode() {

0 commit comments

Comments
 (0)