Skip to content

Commit 0f146cd

Browse files
committed
always re-create client upon an error
1 parent e3792ce commit 0f146cd

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,13 @@ public void put(Collection<SinkRecord> collection) {
164164

165165
private void onSenderException(Exception e) {
166166
if (httpTransport) {
167+
closeSenderSilently();
167168
throw new ConnectException("Failed to send data to QuestDB", e);
168169
}
169170

170171
batchesSinceLastError = 0;
171172
if (--remainingRetries > 0) {
172173
closeSenderSilently();
173-
sender = null;
174174
log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs());
175175
context.timeout(config.getRetryBackoffMs());
176176
throw new RetriableException(e);
@@ -180,12 +180,14 @@ private void onSenderException(Exception e) {
180180
}
181181

182182
private void closeSenderSilently() {
183-
try {
184-
if (sender != null) {
183+
if (sender != null) {
184+
try {
185185
sender.close();
186+
} catch (Exception ex) {
187+
log.warn("Failed to close sender", ex);
188+
} finally {
189+
sender = null;
186190
}
187-
} catch (Exception ex) {
188-
log.warn("Failed to close sender", ex);
189191
}
190192
}
191193

@@ -445,7 +447,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
445447
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
446448
if (httpTransport) {
447449
try {
448-
log.info("Flushing data to QuestDB");
450+
log.debug("Flushing data to QuestDB");
449451
sender.flush();
450452
} catch (LineSenderException | HttpClientException e) {
451453
onSenderException(e);

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,9 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw
307307
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n"
308308
+ "\"John\",\"Doe\",49\r\n",
309309
"select firstname,lastname,age from " + topicName + " where age = 49",
310-
httpPort);
310+
20,
311+
httpPort
312+
);
311313
}
312314

313315
@ParameterizedTest

0 commit comments

Comments
 (0)