Skip to content

Commit d1db4fc

Browse files
committed
Add Kinesis Binder for Spring Cloud Stream
* Introduce `spring-cloud-aws-kinesis-stream-binder` module (the name could be changed) * Mention it in the root POM * Manage it as a dependency * Also, manage `spring-cloud-stream-dependencies` * Add docs, include an image of Binder architecture
1 parent 8672e55 commit d1db4fc

28 files changed

+3862
-0
lines changed
13.2 KB
Loading

docs/src/main/asciidoc/index.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ include::cloudwatch.adoc[]
160160

161161
include::spring-modulith.adoc[]
162162

163+
include::kinesis-stream-binder.adoc[]
164+
163165
include::testing.adoc[]
164166

165167
include::docker-compose.adoc[]

docs/src/main/asciidoc/kinesis-stream-binder.adoc

Lines changed: 506 additions & 0 deletions
Large diffs are not rendered by default.

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
<module>spring-cloud-aws-sqs</module>
4646
<module>spring-cloud-aws-dynamodb</module>
4747
<module>spring-cloud-aws-kinesis</module>
48+
<module>spring-cloud-aws-kinesis-stream-binder</module>
4849
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis</module>
4950
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer</module>
5051
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-client</module>

spring-cloud-aws-dependencies/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
<amazon.encryption.s3.version>3.3.5</amazon.encryption.s3.version>
3232
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
3333
<spring-cloud-commons.version>5.0.0-M1</spring-cloud-commons.version>
34+
<spring-cloud-stream.version>5.0.0-M1</spring-cloud-stream.version>
3435
<jakarta.mail.version>2.1.3</jakarta.mail.version>
3536
<eclipse.jakarta.mail.version>2.0.3</eclipse.jakarta.mail.version>
3637
<spring-modulith.version>2.0.0-M2</spring-modulith.version>
@@ -54,6 +55,13 @@
5455
<type>pom</type>
5556
<scope>import</scope>
5657
</dependency>
58+
<dependency>
59+
<groupId>org.springframework.cloud</groupId>
60+
<artifactId>spring-cloud-stream-dependencies</artifactId>
61+
<version>${spring-cloud-stream.version}</version>
62+
<type>pom</type>
63+
<scope>import</scope>
64+
</dependency>
5765

5866
<dependency>
5967
<groupId>org.springframework.modulith</groupId>
@@ -118,6 +126,11 @@
118126
<artifactId>spring-cloud-aws-kinesis</artifactId>
119127
<version>${project.version}</version>
120128
</dependency>
129+
<dependency>
130+
<groupId>io.awspring.cloud</groupId>
131+
<artifactId>spring-cloud-aws-kinesis-stream-binder</artifactId>
132+
<version>${project.version}</version>
133+
</dependency>
121134
<dependency>
122135
<groupId>io.awspring.cloud</groupId>
123136
<artifactId>spring-cloud-aws-starter-integration-kinesis</artifactId>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>io.awspring.cloud</groupId>
8+
<artifactId>spring-cloud-aws</artifactId>
9+
<version>4.0.0-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>spring-cloud-aws-kinesis-stream-binder</artifactId>
13+
<name>Spring Cloud Stream AWS Kinesis Binder</name>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>io.awspring.cloud</groupId>
18+
<artifactId>spring-cloud-aws-starter</artifactId>
19+
</dependency>
20+
<dependency>
21+
<groupId>io.awspring.cloud</groupId>
22+
<artifactId>spring-cloud-aws-kinesis</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>io.awspring.cloud</groupId>
26+
<artifactId>spring-cloud-aws-dynamodb</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>software.amazon.awssdk</groupId>
30+
<artifactId>kinesis</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>software.amazon.kinesis</groupId>
34+
<artifactId>amazon-kinesis-client</artifactId>
35+
</dependency>
36+
<dependency>
37+
<groupId>software.amazon.kinesis</groupId>
38+
<artifactId>amazon-kinesis-producer</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.springframework.cloud</groupId>
42+
<artifactId>spring-cloud-stream</artifactId>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.springframework.boot</groupId>
47+
<artifactId>spring-boot-configuration-processor</artifactId>
48+
<optional>true</optional>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.springframework.boot</groupId>
52+
<artifactId>spring-boot-starter-actuator</artifactId>
53+
<optional>true</optional>
54+
</dependency>
55+
56+
<dependency>
57+
<groupId>org.springframework.cloud</groupId>
58+
<artifactId>spring-cloud-stream-test-support</artifactId>
59+
<scope>test</scope>
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>io.micrometer</groupId>
64+
<artifactId>micrometer-tracing-integration-test</artifactId>
65+
<scope>test</scope>
66+
<exclusions>
67+
<exclusion>
68+
<groupId>io.opentelemetry</groupId>
69+
<artifactId>*</artifactId>
70+
</exclusion>
71+
<exclusion>
72+
<groupId>com.wavefront</groupId>
73+
<artifactId>*</artifactId>
74+
</exclusion>
75+
<exclusion>
76+
<groupId>io.micrometer</groupId>
77+
<artifactId>micrometer-tracing-bridge-otel</artifactId>
78+
</exclusion>
79+
</exclusions>
80+
</dependency>
81+
<dependency>
82+
<groupId>org.testcontainers</groupId>
83+
<artifactId>junit-jupiter</artifactId>
84+
<scope>test</scope>
85+
</dependency>
86+
<dependency>
87+
<groupId>org.testcontainers</groupId>
88+
<artifactId>localstack</artifactId>
89+
<scope>test</scope>
90+
</dependency>
91+
</dependencies>
92+
93+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.kinesis.stream.binder;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.concurrent.CompletionException;
21+
import java.util.concurrent.TimeUnit;
22+
import org.springframework.boot.health.contributor.Health;
23+
import org.springframework.boot.health.contributor.HealthIndicator;
24+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
25+
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
26+
27+
/**
28+
* @author Artem Bilan
29+
*
30+
* @since 2.0
31+
*/
32+
public class KinesisBinderHealthIndicator implements HealthIndicator {
33+
34+
private final KinesisMessageChannelBinder kinesisMessageChannelBinder;
35+
36+
public KinesisBinderHealthIndicator(KinesisMessageChannelBinder kinesisMessageChannelBinder) {
37+
this.kinesisMessageChannelBinder = kinesisMessageChannelBinder;
38+
}
39+
40+
@Override
41+
public Health health() {
42+
KinesisAsyncClient amazonKinesis = this.kinesisMessageChannelBinder.getAmazonKinesis();
43+
List<String> streamsInUse = new ArrayList<>(this.kinesisMessageChannelBinder.getStreamsInUse());
44+
for (String stream : streamsInUse) {
45+
while (true) {
46+
try {
47+
amazonKinesis.listShards(request -> request.streamName(stream).maxResults(1)).join();
48+
break;
49+
}
50+
catch (CompletionException ex) {
51+
Throwable cause = ex.getCause();
52+
if (cause instanceof LimitExceededException) {
53+
try {
54+
TimeUnit.SECONDS.sleep(1);
55+
}
56+
catch (InterruptedException e) {
57+
Thread.currentThread().interrupt();
58+
return Health.down().withException(ex).build();
59+
}
60+
}
61+
else {
62+
return Health.down().withException(ex).build();
63+
}
64+
}
65+
}
66+
}
67+
return Health.up().build();
68+
}
69+
70+
}

0 commit comments

Comments
 (0)