Skip to content

Commit dcabde1

Browse files
committed
Add support for DynamoDB Streams with Kinesis
* Add `com.amazonaws:dynamodb-streams-kinesis-adapter` dependency which provides and adapter from DynamoDB Streams Client to Kinesis Client. That also incudes some convenient API for KCL configuration. * Add `SpringDynamoDBStreamsAdapterClient` extension to mimic Kinesis API required for the `KinesisMessageDrivenChannelAdapter` * Support DynamoDB Streams Adapter configuration in the `KclMessageDrivenChannelAdapter` * Add tests to verify DynamoDB Stream consumption via both `KinesisMessageDrivenChannelAdapter` and `KclMessageDrivenChannelAdapter` * Improve other KCL tests for race conditions and timing * Implement DynamoDB Streams support in the Kinesis Binder * Include `dynamodb-streams-kinesis-adapter` dependency into `spring-cloud-aws-starter-integration-kinesis-client` since `KclMessageDrivenChannelAdapter` now depends on that adapter API. * Fix "shard iterator ready" race condition in the `KinesisBinderFunctionalTests` Related to: spring-cloud/spring-cloud-stream-binder-aws-kinesis#205
1 parent d1db4fc commit dcabde1

File tree

25 files changed

+1134
-37
lines changed

25 files changed

+1134
-37
lines changed

docs/src/main/asciidoc/kinesis-stream-binder.adoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,11 @@ The KCL graceful shutdown timeout in milliseconds.
308308
+
309309
Default: 0 - regular shutdown process
310310

311+
dynamoDbStreams::
312+
Consume from DynamoDB Streams.
313+
+
314+
Default: false
315+
311316
The `KclMessageDrivenChannelAdapter` can be customized programmatically for the `ConfigsBuilder` parts.
312317
For example, to set a custom value for the `LeaseManagementConfig.maxLeasesForWorker` property, the `ConsumerEndpointCustomizer<KclMessageDrivenChannelAdapter>` bean has to be provided:
313318

@@ -479,6 +484,15 @@ If [Server-Side Encryption](https://docs.aws.amazon.com/streams/latest/dev/what-
479484
}
480485
----
481486

487+
[[dynamodb-streams-support]]
488+
=== DynamoDB Streams Support
489+
490+
The Kinesis Binder provides support for consuming CDC events from the https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html[DynamoDB Streams].
491+
The input binding has to explicitly opt-in for this feature, e.g., `spring.cloud.stream.kinesis.bindings.dynamoDbConsumer-in-0.consumer.dynamoDbStreams=true`.
492+
The rest of the configuration is done automatically by the binder for both classic and KCL consumers.
493+
494+
NOTE: The binding destination for DynamoDB Stream has to be specified as a Stream ARN on the table to capture changes.
495+
482496
[[telling-the-binder-to-use-your-local-endpoint]]
483497
=== Telling the binder to use your local endpoint
484498

docs/src/main/asciidoc/kinesis.adoc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,17 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter(KinesisAsyn
179179

180180
NOTE: Unlike `KinesisMessageDrivenChannelAdapter`, the `KclMessageDrivenChannelAdapter` does not support explicit shard assignments.
181181

182-
=== Spring Integration Starters
182+
==== DynamoDB Streams Support
183+
184+
The `KinesisMessageDrivenChannelAdapter` and `KclMessageDrivenChannelAdapter` provides support for consuming CDC events from the https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html[DynamoDB Streams].
185+
The `com.amazonaws:dynamodb-streams-kinesis-adapter` dependency must be present on classpath.
186+
The KCL provides native support for the mentioned adapter, and only instance of `DynamoDbStreamsClient` has to be injected into the `KclMessageDrivenChannelAdapter` to switch consuming logic from the Kinesis stream to DynamoDB stream.
187+
For consuming via `KinesisMessageDrivenChannelAdapter`, the `SpringDynamoDBStreamsAdapterClient` has to be injected instead of regular `KinesisAsyncClient`.
188+
The `SpringDynamoDBStreamsAdapterClient` is an extension of the `AmazonDynamoDBStreamsAdapterClient` with overridden `listShards()` and `getRecords()` operations to mimic `KinesisAsyncClient` API called from the `KinesisMessageDrivenChannelAdapter` logic.
189+
Both channel adapters require a Stream ARN for DynamoDB Stream on the table.
190+
Using AWS SDK API, such a value is available as a result of the `DescribeTableResponse.table().latestStreamArn()` in the answer to `DynamoDbAsyncClient.describeTable()` request.
191+
192+
==== Spring Integration Starters
183193

184194
The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
185195
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.

spring-cloud-aws-dependencies/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<awssdk-v2.version>2.32.28</awssdk-v2.version>
2828
<kcl.version>3.1.2</kcl.version>
2929
<kpl.version>1.0.4</kpl.version>
30+
<dynamodb-streams.version>2.0.1</dynamodb-streams.version>
3031
<amazon.dax.version>2.0.5</amazon.dax.version>
3132
<amazon.encryption.s3.version>3.3.5</amazon.encryption.s3.version>
3233
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
@@ -294,7 +295,11 @@
294295
<artifactId>amazon-kinesis-producer</artifactId>
295296
<version>${kpl.version}</version>
296297
</dependency>
297-
298+
<dependency>
299+
<groupId>com.amazonaws</groupId>
300+
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
301+
<version>${dynamodb-streams.version}</version>
302+
</dependency>
298303
</dependencies>
299304
</dependencyManagement>
300305

spring-cloud-aws-kinesis-stream-binder/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
<groupId>software.amazon.kinesis</groupId>
3838
<artifactId>amazon-kinesis-producer</artifactId>
3939
</dependency>
40+
<dependency>
41+
<groupId>com.amazonaws</groupId>
42+
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
43+
</dependency>
4044
<dependency>
4145
<groupId>org.springframework.cloud</groupId>
4246
<artifactId>spring-cloud-stream</artifactId>

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/KinesisMessageChannelBinder.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.awspring.cloud.kinesis.integration.KinesisMessageHeaderErrorMessageStrategy;
2222
import io.awspring.cloud.kinesis.integration.KinesisShardOffset;
2323
import io.awspring.cloud.kinesis.integration.KplMessageHandler;
24+
import io.awspring.cloud.kinesis.integration.SpringDynamoDBStreamsAdapterClient;
2425
import io.awspring.cloud.kinesis.stream.binder.properties.KinesisBinderConfigurationProperties;
2526
import io.awspring.cloud.kinesis.stream.binder.properties.KinesisConsumerProperties;
2627
import io.awspring.cloud.kinesis.stream.binder.properties.KinesisExtendedBindingProperties;
@@ -71,6 +72,7 @@
7172
import org.springframework.util.StringUtils;
7273
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
7374
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
75+
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
7476
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
7577
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
7678
import software.amazon.awssdk.services.kinesis.model.Shard;
@@ -112,6 +114,8 @@ public class KinesisMessageChannelBinder extends
112114

113115
private final BytesMessageMapper embeddedHeadersMapper;
114116

117+
private final DynamoDbStreamsClient dynamoDBStreams;
118+
115119
private KinesisExtendedBindingProperties extendedBindingProperties = new KinesisExtendedBindingProperties();
116120

117121
@Nullable
@@ -130,15 +134,16 @@ public class KinesisMessageChannelBinder extends
130134
@SuppressWarnings("removal")
131135
public KinesisMessageChannelBinder(KinesisBinderConfigurationProperties configurationProperties,
132136
KinesisStreamProvisioner provisioningProvider, KinesisAsyncClient amazonKinesis,
133-
@Nullable DynamoDbAsyncClient dynamoDBClient, @Nullable CloudWatchAsyncClient cloudWatchClient) {
137+
@Nullable DynamoDbAsyncClient dynamoDBClient, @Nullable DynamoDbStreamsClient dynamoDBStreams,
138+
@Nullable CloudWatchAsyncClient cloudWatchClient) {
134139

135140
super(new String[0], provisioningProvider);
136141
Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null");
137142
this.configurationProperties = configurationProperties;
138143
this.amazonKinesis = amazonKinesis;
139144
this.cloudWatchClient = cloudWatchClient;
140145
this.dynamoDBClient = dynamoDBClient;
141-
146+
this.dynamoDBStreams = dynamoDBStreams;
142147
this.embeddedHeadersMapper = new org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper(
143148
headersToMap(configurationProperties));
144149
}
@@ -350,6 +355,9 @@ private MessageProducerSupport createKclConsumerEndpoint(ConsumerDestination des
350355

351356
KclMessageDrivenChannelAdapter adapter = new KclMessageDrivenChannelAdapter(this.amazonKinesis,
352357
this.cloudWatchClient, this.dynamoDBClient, streams);
358+
if (kinesisConsumerProperties.isDynamoDbStreams()) {
359+
adapter.setDynamoDBStreams(this.dynamoDBStreams);
360+
}
353361

354362
boolean anonymous = !StringUtils.hasText(group);
355363
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID() : group;
@@ -463,24 +471,28 @@ private MessageProducerSupport createKinesisConsumerEndpoint(ConsumerDestination
463471
}
464472
}
465473

474+
KinesisAsyncClient amazonKinesisClient = kinesisConsumerProperties.isDynamoDbStreams()
475+
? new SpringDynamoDBStreamsAdapterClient(this.dynamoDBStreams)
476+
: this.amazonKinesis;
477+
466478
KinesisMessageDrivenChannelAdapter adapter;
467479

468480
String shardId = kinesisConsumerProperties.getShardId();
469481

470482
if (CollectionUtils.isEmpty(shardOffsets) && shardId == null) {
471483
String[] streams = Arrays.stream(StringUtils.commaDelimitedListToStringArray(destination.getName()))
472484
.map(String::trim).toArray(String[]::new);
473-
adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, streams);
485+
adapter = new KinesisMessageDrivenChannelAdapter(amazonKinesisClient, streams);
474486
}
475487
else if (shardId != null) {
476488
Assert.state(!properties.isMultiplex(), "Cannot use multi-stream binding together with 'shard-id'");
477489
KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset);
478490
shardOffset.setStream(destination.getName());
479491
shardOffset.setShard(shardId);
480-
adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, shardOffset);
492+
adapter = new KinesisMessageDrivenChannelAdapter(amazonKinesisClient, shardOffset);
481493
}
482494
else {
483-
adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis,
495+
adapter = new KinesisMessageDrivenChannelAdapter(amazonKinesisClient,
484496
shardOffsets.toArray(new KinesisShardOffset[0]));
485497
}
486498

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.kinesis.stream.binder.config;
17+
18+
import io.awspring.cloud.autoconfigure.AwsClientProperties;
19+
import org.springframework.boot.context.properties.ConfigurationProperties;
20+
21+
/**
22+
* {@link ConfigurationProperties} for configuring the DynamoDb Streams client.
23+
*
24+
* @author Artem Bilan
25+
*
26+
* @since 4.0
27+
*/
28+
@ConfigurationProperties(prefix = "spring.cloud.aws.dynamodb-streams")
29+
public class DynamoDbStreamsProperties extends AwsClientProperties {
30+
}

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/config/KinesisBinderConfiguration.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
6262
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
6363
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
64+
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
65+
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClientBuilder;
6466
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
6567
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
6668
import software.amazon.kinesis.producer.KinesisProducerConfiguration;
@@ -79,7 +81,8 @@
7981
@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
8082
@ConditionalOnMissingBean(Binder.class)
8183
@EnableConfigurationProperties({ KinesisBinderConfigurationProperties.class, KinesisExtendedBindingProperties.class,
82-
KinesisProperties.class, DynamoDbProperties.class, CloudWatchProperties.class })
84+
KinesisProperties.class, DynamoDbProperties.class, DynamoDbStreamsProperties.class,
85+
CloudWatchProperties.class })
8386
public class KinesisBinderConfiguration {
8487

8588
private final KinesisBinderConfigurationProperties configurationProperties;
@@ -216,20 +219,36 @@ public KinesisProducerConfiguration kinesisProducerConfiguration() {
216219
return kinesisProducerConfiguration;
217220
}
218221

222+
@Bean
223+
@ConditionalOnMissingBean
224+
public DynamoDbStreamsClient dynamoDBStreams(DynamoDbStreamsProperties properties,
225+
ObjectProvider<AwsClientCustomizer<DynamoDbStreamsClientBuilder>> configurer) {
226+
if (this.hasInputs) {
227+
return awsClientBuilderConfigurer
228+
.configureAsyncClient(DynamoDbStreamsClient.builder(), properties, null, configurer.stream(), null)
229+
.build();
230+
}
231+
else {
232+
return null;
233+
}
234+
}
235+
219236
@Bean
220237
public KinesisMessageChannelBinder kinesisMessageChannelBinder(KinesisStreamProvisioner provisioningProvider,
221238
KinesisAsyncClient amazonKinesis, KinesisExtendedBindingProperties kinesisExtendedBindingProperties,
222239
@Autowired(required = false) ConcurrentMetadataStore kinesisCheckpointStore,
223240
@Autowired(required = false) LockRegistry<?> lockRegistry,
224241
@Autowired(required = false) DynamoDbAsyncClient dynamoDBClient,
242+
@Autowired(required = false) DynamoDbStreamsClient dynamoDBStreams,
225243
@Autowired(required = false) CloudWatchAsyncClient cloudWatchClient,
226244
@Autowired(required = false) KinesisProducerConfiguration kinesisProducerConfiguration,
227245
@Autowired(required = false) ProducerMessageHandlerCustomizer<? extends AbstractMessageProducingHandler> producerMessageHandlerCustomizer,
228246
@Autowired(required = false) ConsumerEndpointCustomizer<? extends MessageProducerSupport> consumerEndpointCustomizer,
229247
@Autowired ObservationRegistry observationRegistry) {
230248

231249
KinesisMessageChannelBinder kinesisMessageChannelBinder = new KinesisMessageChannelBinder(
232-
this.configurationProperties, provisioningProvider, amazonKinesis, dynamoDBClient, cloudWatchClient);
250+
this.configurationProperties, provisioningProvider, amazonKinesis, dynamoDBClient, dynamoDBStreams,
251+
cloudWatchClient);
233252
kinesisMessageChannelBinder.setCheckpointStore(kinesisCheckpointStore);
234253
kinesisMessageChannelBinder.setLockRegistry(lockRegistry);
235254
kinesisMessageChannelBinder.setExtendedBindingProperties(kinesisExtendedBindingProperties);

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/properties/KinesisConsumerProperties.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public class KinesisConsumerProperties {
9292

9393
private boolean embedHeaders = true;
9494

95+
private boolean dynamoDbStreams;
96+
9597
/**
9698
* The {@link MetricsLevel} for emitting (or not) metrics into Cloud Watch.
9799
*/
@@ -241,4 +243,12 @@ public void setGracefulShutdownTimeout(long gracefulShutdownTimeout) {
241243
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
242244
}
243245

246+
public boolean isDynamoDbStreams() {
247+
return this.dynamoDbStreams;
248+
}
249+
250+
public void setDynamoDbStreams(boolean dynamoDbStreams) {
251+
this.dynamoDbStreams = dynamoDbStreams;
252+
}
253+
244254
}

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/provisioning/KinesisStreamProvisioner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.awspring.cloud.kinesis.stream.binder.properties.KinesisConsumerProperties;
2020
import io.awspring.cloud.kinesis.stream.binder.properties.KinesisProducerProperties;
2121
import java.time.Duration;
22+
import java.util.Collections;
2223
import java.util.List;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CompletionException;
@@ -94,6 +95,11 @@ public ConsumerDestination provisionConsumerDestination(String name, String grou
9495
properties.setHeaderMode(HeaderMode.none);
9596
}
9697

98+
if (kinesisConsumerProperties.isDynamoDbStreams()) {
99+
logger.info(() -> "Using DynamoDB table in DynamoDB Streams support for inbound: " + name);
100+
return new KinesisConsumerDestination(name, Collections.emptyList());
101+
}
102+
97103
int shardCount = properties.getInstanceCount() * properties.getConcurrency();
98104

99105
if (!properties.isMultiplex()) {

spring-cloud-aws-kinesis-stream-binder/src/test/java/io/awspring/cloud/kinesis/stream/binder/KinesisBinderFunctionalTests.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.awspring.cloud.kinesis.stream.binder;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.awaitility.Awaitility.await;
1920

2021
import com.fasterxml.jackson.core.JsonProcessingException;
2122
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -85,6 +86,23 @@ public class KinesisBinderFunctionalTests implements LocalstackContainerTest {
8586
@SuppressWarnings("unchecked")
8687
@Test
8788
void testKinesisFunction() throws JsonProcessingException, InterruptedException {
89+
List<Binding<?>> consumerBindings = this.bindingService
90+
.getConsumerBindings("eventConsumerBatchProcessingWithHeaders-in-0");
91+
92+
assertThat(consumerBindings).hasSize(1);
93+
94+
Binding<?> binding = consumerBindings.get(0);
95+
96+
Map<KinesisShardOffset, ?> shardConsumers = TestUtils.getPropertyValue(binding, "lifecycle.shardConsumers",
97+
Map.class);
98+
assertThat(shardConsumers).hasSize(2).hasKeySatisfying(keySatisfyingCondition(KINESIS_STREAM))
99+
.hasKeySatisfying(keySatisfyingCondition("some_other_stream"));
100+
101+
Object shardConsumer = shardConsumers.values().iterator().next();
102+
103+
await().untilAsserted(
104+
() -> assertThat(TestUtils.getPropertyValue(shardConsumer, "state").toString()).isEqualTo("CONSUME"));
105+
88106
PutRecordsRequest.Builder putRecordsRequest = PutRecordsRequest.builder().streamName(KINESIS_STREAM);
89107

90108
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
@@ -118,18 +136,6 @@ void testKinesisFunction() throws JsonProcessingException, InterruptedException
118136

119137
assertThat(messageFromBatch.getPayload()).isEqualTo("Message0");
120138
assertThat(messageFromBatch.getHeaders()).containsEntry("event.eventType", "createEvent");
121-
122-
List<Binding<?>> consumerBindings = this.bindingService
123-
.getConsumerBindings("eventConsumerBatchProcessingWithHeaders-in-0");
124-
125-
assertThat(consumerBindings).hasSize(1);
126-
127-
Binding<?> binding = consumerBindings.get(0);
128-
129-
Map<KinesisShardOffset, ?> shardConsumers = TestUtils.getPropertyValue(binding, "lifecycle.shardConsumers",
130-
Map.class);
131-
assertThat(shardConsumers).hasSize(2).hasKeySatisfying(keySatisfyingCondition(KINESIS_STREAM))
132-
.hasKeySatisfying(keySatisfyingCondition("some_other_stream"));
133139
}
134140

135141
private static Condition<KinesisShardOffset> keySatisfyingCondition(String streamName) {

0 commit comments

Comments
 (0)