Skip to content

Commit 8672e55

Browse files
authored
Add Spring Integration support with KCL (#1496)
1 parent 2151d3b commit 8672e55

File tree

12 files changed

+1605
-12
lines changed

12 files changed

+1605
-12
lines changed

docs/src/main/asciidoc/kinesis.adoc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,40 @@ MessageHandler kplMessageHandler(KinesisProducer kinesisProducer, Schema schema)
148148
}
149149
----
150150

151+
The `KclMessageDrivenChannelAdapter` is a `MessageProducerSupport` implementation to perform record consumption from the Kinesis stream(s) using https://docs.aws.amazon.com/streams/latest/dev/kcl.html[Kinesis Client Library (KCL)].
152+
The configuration and behavior are similar to the `KinesisMessageDrivenChannelAdapter` described above.
153+
This class also exposes a handful of options to configure an internal `Scheduler` instance from KCL where the real work is delegated to.
154+
Therefore, a `CloudWatchAsyncClient` and `DynamoDbAsyncClient` can also be injected into the `KclMessageDrivenChannelAdapter`.
155+
They are used by the KCL for checkpointing and `Kinesis` consumers coordination.
156+
By default, this channel adapter relies on the https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html[fan-out] retrieval strategy.
157+
158+
The configuration of the `KclMessageDrivenChannelAdapter` is like following:
159+
160+
[source,java]
161+
----
162+
@Bean
163+
public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter(KinesisAsyncClient kinesisClient,
164+
CloudWatchAsyncClient cloudWatchClient, DynamoDbAsyncClient dynamoDBClient,
165+
PollableChannel kinesisReceiveChannel) {
166+
167+
KclMessageDrivenChannelAdapter adapter = new KclMessageDrivenChannelAdapter(kinesisClient, cloudWatchClient,
168+
dynamoDBClient, "stream1", "stream2");
169+
adapter.setOutputChannel(kinesisReceiveChannel);
170+
adapter.setStreamInitialSequence(InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
171+
adapter.setConverter(String::new);
172+
adapter.setConsumerGroup("multi_stream_group");
173+
adapter.setMetricsLevel(MetricsLevel.NONE);
174+
adapter.setLeaseManagementConfigCustomizer(leaseManagementConfig ->
175+
leaseManagementConfig.workerUtilizationAwareAssignmentConfig().disableWorkerMetrics(true));
176+
return adapter;
177+
}
178+
----
179+
180+
NOTE: Unlike `KinesisMessageDrivenChannelAdapter`, the `KclMessageDrivenChannelAdapter` does not support explicit shard assignments.
181+
151182
=== Spring Integration Starters
152183

153184
The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
154185
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.
155186
The `spring-cloud-aws-starter-integration-kinesis-producer` artifact is dedicated for dependencies related to the Kinesis Producer Library.
187+
The `spring-cloud-aws-starter-integration-kinesis-client` artifact is dedicated for dependencies related to the Kinesis Client Library.

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
<module>spring-cloud-aws-kinesis</module>
4848
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis</module>
4949
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer</module>
50+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-client</module>
5051
<module>spring-cloud-aws-s3</module>
5152
<module>spring-cloud-aws-testcontainers</module>
5253
<module>spring-cloud-aws-starters/spring-cloud-aws-starter</module>

spring-cloud-aws-dependencies/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@
128128
<artifactId>spring-cloud-aws-starter-integration-kinesis-producer</artifactId>
129129
<version>${project.version}</version>
130130
</dependency>
131+
<dependency>
132+
<groupId>io.awspring.cloud</groupId>
133+
<artifactId>spring-cloud-aws-starter-integration-kinesis-client</artifactId>
134+
<version>${project.version}</version>
135+
</dependency>
131136

132137
<dependency>
133138
<groupId>io.awspring.cloud</groupId>

spring-cloud-aws-kinesis/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
</dependency>
2121
<dependency>
2222
<groupId>software.amazon.kinesis</groupId>
23-
<artifactId>amazon-kinesis-client</artifactId>
23+
<artifactId>amazon-kinesis-producer</artifactId>
2424
<optional>true</optional>
2525
</dependency>
2626
<dependency>
2727
<groupId>software.amazon.kinesis</groupId>
28-
<artifactId>amazon-kinesis-producer</artifactId>
28+
<artifactId>amazon-kinesis-client</artifactId>
2929
<optional>true</optional>
3030
</dependency>
3131
<dependency>

0 commit comments

Comments
 (0)