Skip to content

Commit 8ad766d

Browse files
committed
CASSJAVA-116: Retry or Speculative Execution with RequestIdGenerator throws "Duplicate Key"
patch by Jane He; reviewed by Andy Tolbert and Lukasz Atoniak for CASSJAVA-116
1 parent f631081 commit 8ad766d

File tree

4 files changed

+110
-13
lines changed

4 files changed

+110
-13
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,21 @@
1919

2020
import com.datastax.oss.driver.api.core.cql.Statement;
2121
import com.datastax.oss.driver.api.core.session.Request;
22-
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
2322
import edu.umd.cs.findbugs.annotations.NonNull;
2423
import java.nio.ByteBuffer;
2524
import java.nio.charset.StandardCharsets;
25+
import java.util.Collections;
26+
import java.util.HashMap;
2627
import java.util.Map;
2728

2829
/**
2930
* Interface responsible for generating request IDs.
3031
*
31-
* <p>Note that all request IDs have a parent/child relationship. A "parent ID" can loosely be
32-
* thought of as encompassing a sequence of a request + any attendant retries, speculative
32+
* <p>Note that all request IDs have a parent/child relationship. A "session request ID" can loosely
33+
* be thought of as encompassing a sequence of a request + any attendant retries, speculative
3334
* executions etc. It's scope is identical to that of a {@link
34-
* com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "request ID" represents a single
35-
* request within this larger scope. Note that a request corresponding to a request ID may be
35+
* com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "node request ID" represents a
36+
* single request within this larger scope. Note that a request corresponding to a request ID may be
3637
* retried; in that case the retry count will be appended to the corresponding identifier in the
3738
* logs.
3839
*/
@@ -67,11 +68,17 @@ default String getCustomPayloadKey() {
6768

6869
default Statement<?> getDecoratedStatement(
6970
@NonNull Statement<?> statement, @NonNull String requestId) {
70-
Map<String, ByteBuffer> customPayload =
71-
NullAllowingImmutableMap.<String, ByteBuffer>builder()
72-
.putAll(statement.getCustomPayload())
73-
.put(getCustomPayloadKey(), ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)))
74-
.build();
75-
return statement.setCustomPayload(customPayload);
71+
72+
Map<String, ByteBuffer> existing = new HashMap<>(statement.getCustomPayload());
73+
String key = getCustomPayloadKey();
74+
75+
// Add or overwrite
76+
existing.put(key, ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)));
77+
78+
// Allowing null key/values
79+
// Wrap a map inside to be immutable without instanciating a new map
80+
Map<String, ByteBuffer> unmodifiableMap = Collections.unmodifiableMap(existing);
81+
82+
return statement.setCustomPayload(unmodifiableMap);
7683
}
7784
}

core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerRetryTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,22 @@
4848
import com.datastax.oss.driver.api.core.servererrors.ServerError;
4949
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
5050
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
51+
import com.datastax.oss.driver.api.core.session.Request;
52+
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
5153
import com.datastax.oss.protocol.internal.ProtocolConstants;
5254
import com.datastax.oss.protocol.internal.response.Error;
5355
import com.datastax.oss.protocol.internal.response.error.ReadTimeout;
5456
import com.datastax.oss.protocol.internal.response.error.Unavailable;
5557
import com.datastax.oss.protocol.internal.response.error.WriteTimeout;
5658
import com.tngtech.java.junit.dataprovider.DataProvider;
5759
import com.tngtech.java.junit.dataprovider.UseDataProvider;
60+
import edu.umd.cs.findbugs.annotations.NonNull;
61+
import java.nio.ByteBuffer;
62+
import java.nio.charset.StandardCharsets;
5863
import java.util.Iterator;
5964
import java.util.concurrent.CompletionStage;
6065
import java.util.concurrent.TimeUnit;
66+
import java.util.concurrent.atomic.AtomicInteger;
6167
import org.junit.Test;
6268

6369
public class CqlRequestHandlerRetryTest extends CqlRequestHandlerTestBase {
@@ -384,6 +390,63 @@ public void should_rethrow_error_if_not_idempotent_and_error_unsafe_or_policy_re
384390
}
385391
}
386392

393+
@Test
394+
@UseDataProvider("failureAndIdempotent")
395+
public void should_not_fail_with_duplicate_key_when_retrying_with_request_id_generator(
396+
FailureScenario failureScenario, boolean defaultIdempotence, Statement<?> statement) {
397+
398+
// Create a RequestIdGenerator that uses the same key as the statement's custom payload
399+
RequestIdGenerator requestIdGenerator =
400+
new RequestIdGenerator() {
401+
private AtomicInteger counter = new AtomicInteger(0);
402+
403+
@Override
404+
public String getSessionRequestId() {
405+
return "session-123";
406+
}
407+
408+
@Override
409+
public String getNodeRequestId(@NonNull Request request, @NonNull String parentId) {
410+
return parentId + "-" + counter.getAndIncrement();
411+
}
412+
};
413+
414+
RequestHandlerTestHarness.Builder harnessBuilder =
415+
RequestHandlerTestHarness.builder()
416+
.withDefaultIdempotence(defaultIdempotence)
417+
.withRequestIdGenerator(requestIdGenerator);
418+
failureScenario.mockRequestError(harnessBuilder, node1);
419+
harnessBuilder.withResponse(node2, defaultFrameOf(singleRow()));
420+
421+
try (RequestHandlerTestHarness harness = harnessBuilder.build()) {
422+
failureScenario.mockRetryPolicyVerdict(
423+
harness.getContext().getRetryPolicy(anyString()), RetryVerdict.RETRY_NEXT);
424+
425+
CompletionStage<AsyncResultSet> resultSetFuture =
426+
new CqlRequestHandler(statement, harness.getSession(), harness.getContext(), "test")
427+
.handle();
428+
429+
// The test should succeed without throwing a duplicate key exception
430+
assertThatStage(resultSetFuture)
431+
.isSuccess(
432+
resultSet -> {
433+
Iterator<Row> rows = resultSet.currentPage().iterator();
434+
assertThat(rows.hasNext()).isTrue();
435+
assertThat(rows.next().getString("message")).isEqualTo("hello, world");
436+
437+
ExecutionInfo executionInfo = resultSet.getExecutionInfo();
438+
assertThat(executionInfo.getCoordinator()).isEqualTo(node2);
439+
assertThat(executionInfo.getErrors()).hasSize(1);
440+
assertThat(executionInfo.getErrors().get(0).getKey()).isEqualTo(node1);
441+
442+
// Verify that the custom payload still contains the request ID key
443+
// (either the original value or the generated one, depending on implementation)
444+
assertThat(executionInfo.getRequest().getCustomPayload().get("request-id"))
445+
.isEqualTo(ByteBuffer.wrap("session-123-1".getBytes(StandardCharsets.UTF_8)));
446+
});
447+
}
448+
}
449+
387450
/**
388451
* Sets up the mocks to simulate an error from a node, and make the retry policy return a given
389452
* decision for that error.

core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.datastax.oss.driver.api.core.session.Session;
3838
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
3939
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
40+
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
4041
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
4142
import com.datastax.oss.driver.internal.core.DefaultConsistencyLevelRegistry;
4243
import com.datastax.oss.driver.internal.core.ProtocolFeature;
@@ -170,7 +171,8 @@ protected RequestHandlerTestHarness(Builder builder) {
170171

171172
when(context.getRequestTracker()).thenReturn(new NoopRequestTracker(context));
172173

173-
when(context.getRequestIdGenerator()).thenReturn(Optional.empty());
174+
when(context.getRequestIdGenerator())
175+
.thenReturn(Optional.ofNullable(builder.requestIdGenerator));
174176
}
175177

176178
public DefaultSession getSession() {
@@ -203,6 +205,7 @@ public static class Builder {
203205
private final List<PoolBehavior> poolBehaviors = new ArrayList<>();
204206
private boolean defaultIdempotence;
205207
private ProtocolVersion protocolVersion;
208+
private RequestIdGenerator requestIdGenerator;
206209

207210
/**
208211
* Sets the given node as the next one in the query plan; an empty pool will be simulated when
@@ -258,6 +261,11 @@ public Builder withProtocolVersion(ProtocolVersion protocolVersion) {
258261
return this;
259262
}
260263

264+
public Builder withRequestIdGenerator(RequestIdGenerator requestIdGenerator) {
265+
this.requestIdGenerator = requestIdGenerator;
266+
return this;
267+
}
268+
261269
/**
262270
* Sets the given node as the next one in the query plan; the test code is responsible of
263271
* calling the methods on the returned object to complete the write and the query.

integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
*/
1818
package com.datastax.oss.driver.core.tracker;
1919

20+
import static com.datastax.oss.driver.Assertions.assertThatStage;
2021
import static org.assertj.core.api.Assertions.assertThat;
2122

2223
import com.datastax.oss.driver.api.core.CqlSession;
2324
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2425
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
2526
import com.datastax.oss.driver.api.core.cql.ResultSet;
27+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
2628
import com.datastax.oss.driver.api.core.cql.Statement;
2729
import com.datastax.oss.driver.api.core.session.Request;
2830
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
@@ -119,7 +121,24 @@ public void should_not_write_id_to_custom_payload_when_key_is_not_set() {
119121
try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
120122
String query = "SELECT * FROM system.local";
121123
ResultSet rs = session.execute(query);
122-
assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key")).isNull();
124+
assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id")).isNull();
125+
}
126+
}
127+
128+
@Test
129+
public void should_succeed_with_null_value_in_custom_payload() {
130+
DriverConfigLoader loader =
131+
SessionUtils.configLoaderBuilder()
132+
.withString(
133+
DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS, "W3CContextRequestIdGenerator")
134+
.build();
135+
try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
136+
String query = "SELECT * FROM system.local";
137+
Map<String, ByteBuffer> customPayload =
138+
new NullAllowingImmutableMap.Builder<String, ByteBuffer>(1).put("my_key", null).build();
139+
SimpleStatement statement =
140+
SimpleStatement.newInstance(query).setCustomPayload(customPayload);
141+
assertThatStage(session.executeAsync(statement)).isSuccess();
123142
}
124143
}
125144
}

0 commit comments

Comments
 (0)