Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ include::cloudwatch.adoc[]

include::spring-modulith.adoc[]

include::kinesis-stream-binder.adoc[]

include::testing.adoc[]

include::docker-compose.adoc[]
Expand Down
520 changes: 520 additions & 0 deletions docs/src/main/asciidoc/kinesis-stream-binder.adoc

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion docs/src/main/asciidoc/kinesis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,17 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter(KinesisAsyn

NOTE: Unlike `KinesisMessageDrivenChannelAdapter`, the `KclMessageDrivenChannelAdapter` does not support explicit shard assignments.

=== Spring Integration Starters
==== DynamoDB Streams Support

The `KinesisMessageDrivenChannelAdapter` and `KclMessageDrivenChannelAdapter` provide support for consuming CDC events from the https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html[DynamoDB Streams].
The `com.amazonaws:dynamodb-streams-kinesis-adapter` dependency must be present on classpath.
The KCL provides native support for the mentioned adapter, and only instance of `DynamoDbStreamsClient` has to be injected into the `KclMessageDrivenChannelAdapter` to switch consuming logic from the Kinesis stream to DynamoDB stream.
For consuming via `KinesisMessageDrivenChannelAdapter`, the `SpringDynamoDBStreamsAdapterClient` has to be injected instead of regular `KinesisAsyncClient`.
The `SpringDynamoDBStreamsAdapterClient` is an extension of the `AmazonDynamoDBStreamsAdapterClient` with overridden `listShards()` and `getRecords()` operations to mimic `KinesisAsyncClient` API called from the `KinesisMessageDrivenChannelAdapter` logic.
Both channel adapters require a Stream ARN for DynamoDB Stream on the table.
Using AWS SDK API, such a Stream ARN value is available as a result of the `DescribeTableResponse.table().latestStreamArn()` in the answer to `DynamoDbAsyncClient.describeTable()` request.

==== Spring Integration Starters

The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<module>spring-cloud-aws-sqs</module>
<module>spring-cloud-aws-dynamodb</module>
<module>spring-cloud-aws-kinesis</module>
<module>spring-cloud-aws-kinesis-stream-binder</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer</module>
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-client</module>
Expand Down
20 changes: 19 additions & 1 deletion spring-cloud-aws-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
<kcl.version>3.1.2</kcl.version>
<kpl.version>1.0.4</kpl.version>
<amazon.dax.version>2.0.6</amazon.dax.version>
<dynamodb-streams.version>2.0.1</dynamodb-streams.version>
<amazon.encryption.s3.version>3.3.5</amazon.encryption.s3.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<spring-cloud-commons.version>5.0.0-RC1</spring-cloud-commons.version>
<spring-cloud-stream.version>5.0.0</spring-cloud-stream.version>
<jakarta.mail.version>2.1.3</jakarta.mail.version>
<eclipse.jakarta.mail.version>2.0.3</eclipse.jakarta.mail.version>
<spring-modulith.version>2.0.0-RC1</spring-modulith.version>
Expand All @@ -54,6 +56,13 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.springframework.modulith</groupId>
Expand Down Expand Up @@ -118,6 +127,11 @@
<artifactId>spring-cloud-aws-kinesis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-kinesis-stream-binder</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-integration-kinesis</artifactId>
Expand Down Expand Up @@ -281,7 +295,11 @@
<artifactId>amazon-kinesis-producer</artifactId>
<version>${kpl.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>${dynamodb-streams.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
106 changes: 106 additions & 0 deletions spring-cloud-aws-kinesis-stream-binder/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>4.0.0-M1</version>
</parent>

<artifactId>spring-cloud-aws-kinesis-stream-binder</artifactId>
<name>Spring Cloud Stream AWS Kinesis Binder</name>

<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-kinesis</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-dynamodb</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-micrometer-tracing-brave</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-micrometer-tracing-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-integration-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.opentelemetry</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.wavefront</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.kinesis.stream.binder;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import org.springframework.boot.health.contributor.Health;
import org.springframework.boot.health.contributor.HealthIndicator;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;

/**
* @author Artem Bilan
*
* @since 4.0
*/
public class KinesisBinderHealthIndicator implements HealthIndicator {

private final KinesisMessageChannelBinder kinesisMessageChannelBinder;

public KinesisBinderHealthIndicator(KinesisMessageChannelBinder kinesisMessageChannelBinder) {
this.kinesisMessageChannelBinder = kinesisMessageChannelBinder;
}

@Override
public Health health() {
KinesisAsyncClient amazonKinesis = this.kinesisMessageChannelBinder.getAmazonKinesis();
List<String> streamsInUse = new ArrayList<>(this.kinesisMessageChannelBinder.getStreamsInUse());
for (String stream : streamsInUse) {
while (true) {
try {
amazonKinesis.listShards(request -> request.streamName(stream).maxResults(1)).join();
break;
}
catch (CompletionException ex) {
Throwable cause = ex.getCause();
if (cause instanceof LimitExceededException) {
try {
TimeUnit.SECONDS.sleep(1);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Health.down().withException(ex).build();
}
}
else {
return Health.down().withException(ex).build();
}
}
}
}
return Health.up().build();
}

}
Loading
Loading