Skip to content

Commit 51ebf40

Browse files
rohan mukeshartembilan
authored andcommitted
Add Docs for Redis Stream Channel adapters
* Fix JavaDoc for the `ReactiveRedisStreamMessageProducer.setExtractPayload()`
1 parent 80d3c67 commit 51ebf40

File tree

2 files changed

+92
-13
lines changed

2 files changed

+92
-13
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void setReadOffset(ReadOffset readOffset) {
105105
}
106106

107107
/**
108-
* Configure this channel adapter to extract or not the message payload.
108+
* Configure this channel adapter to extract or not value from the {@link Record}.
109109
* @param extractPayload default true
110110
*/
111111
public void setExtractPayload(boolean extractPayload) {

src/reference/asciidoc/redis.adoc

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ For example, if the store is written to using a Redis store outbound adapter tha
507507
----
508508
====
509509
510-
Th `RedisTemplate` uses `String` serializers for keys and hash keys and the default JDK Serialization serializers for values and hash values.
510+
The `RedisTemplate` uses `String` serializers for keys and hash keys and the default JDK Serialization serializers for values and hash values.
511511
=====
512512

513513
Because it has a literal value for the `key`, the preceding example is relatively simple and static.
@@ -769,6 +769,96 @@ IMPORTANT: The `task-executor` has to be configured with more than one thread fo
769769
The `errorChannel` can be used to process those errors, to avoid restarts, but it preferable to not expose your application to the possible deadlock situation.
770770
See Spring Framework https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#scheduling-task-executor-types[Reference Manual] for possible `TaskExecutor` implementations.
771771

772+
[[redis-stream-outbound]]
773+
=== Redis Stream Outbound Channel Adapter
774+
775+
Spring Integration 5.4 introduced Reactive Redis Stream outbound channel adapter to write Message payload into Redis stream.
776+
Outbound Channel adapter uses `ReactiveStreamOperations.add(...)` to add a `Record` to the stream.
777+
The following example shows how to use Java configuration and Service class for Redis Stream Outbound Channel Adapter.
778+
779+
====
780+
[source,java]
781+
----
782+
@Bean
783+
@ServiceActivator(inputChannel = "messageChannel")
784+
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
785+
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
786+
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
787+
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); <1>
788+
reactiveRedisStreamMessageHandler.setSerializationContext(serializationContext); <2>
789+
reactiveRedisStreamMessageHandler.setHashMapper(hashMapper); <3>
790+
reactiveRedisStreamMessageHandler.setExtractPayload(true); <4>
791+
}
792+
----
793+
<1> Construct an instance of `ReactiveRedisStreamMessageHandler` using `ReactiveRedisConnectionFactory` and stream name to add records.
794+
Another constructor variant is based on a SpEL expression to evaluate a stream key against a request message.
795+
<2> Set a `RedisSerializationContext` used to serialize a record key and value before adding to the stream.
796+
<3> Set `HashMapper` which provides contract between Java types and Redis hashes/maps.
797+
<4> If 'true', channel adapter will extract payload from a request message for a stream record to add.
798+
Or use the whole message as a value.
799+
It defaults to `true`.
800+
====
801+
802+
[[redis-stream-inbound]]
803+
=== Redis Stream Inbound Channel Adapter
804+
805+
Spring Integration 5.4 introduced the Reactive Stream inbound channel adapter for reading messages from a Redis Stream.
806+
Inbound channel adapter uses `StreamReceiver.receive(...)` or `StreamReceiver.receiveAutoAck()` based on auto acknowledgement flag to read record from Redis stream.
807+
The following example shows how to use Java configuration for Redis Stream Inbound Channel Adapter.
808+
809+
====
810+
[source,java]
811+
----
812+
@Bean
813+
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
814+
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
815+
ReactiveRedisStreamMessageProducer messageProducer =
816+
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); <1>
817+
messageProducer.setStreamReceiverOptions( <2>
818+
StreamReceiver.StreamReceiverOptions.builder()
819+
.pollTimeout(Duration.ofMillis(100))
820+
.build());
821+
messageProducer.setAutoStartup(true); <3>
822+
messageProducer.setAutoAck(false); <4>
823+
messageProducer.setCreateConsumerGroup(true); <5>
824+
messageProducer.setConsumerGroup("my-group"); <6>
825+
messageProducer.setConsumerName("my-consumer"); <7>
826+
messageProducer.setOutputChannel(fromRedisStreamChannel); <8>
827+
messageProducer.setReadOffset(ReadOffset.latest()); <9>
828+
messageProducer.extractPayload(true); <10>
829+
return messageProducer;
830+
}
831+
----
832+
<1> Construct an instance of `ReactiveRedisStreamMessageProducer` using `ReactiveRedisConnectionFactory` and stream key to read records.
833+
<2> A `StreamReceiver.StreamReceiverOptions` to consume redis stream using reactive infrastructure.
834+
<3> A `SmartLifecycle` attribute to specify whether this endpoint should start automatically after the application context start or not.
835+
It defaults to `true`.
836+
If `false`, `RedisStreamMessageProducer` should be started manually `messageProducer.start()`.
837+
<4> If `false`, received messages are not auto acknowledged.
838+
The acknowledgement of the message will be deferred to the client consuming message.
839+
It defaults to `true`.
840+
<5> If `true`, a consumer group will be created.
841+
During creation of consumer group stream will be created (if not exists yet), too.
842+
Consumer group track message delivery and distinguish between consumers.
843+
It defaults to `false`.
844+
<6> Set Consumer Group name.
845+
It defaults to the defined bean name.
846+
<7> Set Consumer name.
847+
Reads message as `my-consumer` from group `my-group`.
848+
<8> The message channel to which to send messages from this endpoint.
849+
<9> Define the offset to read message.
850+
It defaults to `ReadOffset.latest()`.
851+
<10> If 'true', channel adapter will extract payload value from the `Record`.
852+
Otherwise the whole `Record` is used as a payload.
853+
It defaults to `true`.
854+
====
855+
856+
If the `autoAck` is set to `false`, the `Record` in Redis Stream is not acknowledge automatically by the Redis driver, instead an `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header is added into a message to produce with a `SimpleAcknowledgment` instance as a value.
857+
It is a target integration flow responsibility to call its `acknowledge()` callback whenever the business logic is done for the message based on such a record.
858+
Similar logic is required even when an exception happens during deserialization and `errorChannel` is configured.
859+
So, target error handler must decided to ack or nack such a failed message.
860+
Alongside with `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`, the `ReactiveRedisStreamMessageProducer` also populates these headers into the message to produce: `RedisHeaders.STREAM_KEY`, `RedisHeaders.STREAM_MESSAGE_ID`, `RedisHeaders.CONSUMER_GROUP` and `RedisHeaders.CONSUMER`.
861+
772862
[[redis-lock-registry]]
773863
=== Redis Lock Registry
774864

@@ -789,14 +879,3 @@ However, the resources protected by such a lock may have been compromised, so su
789879
You should set the expiry at a large enough value to prevent this condition, but set it low enough that the lock can be recovered after a server failure in a reasonable amount of time.
790880

791881
Starting with version 5.0, the `RedisLockRegistry` implements `ExpirableLockRegistry`, which removes locks last acquired more than `age` ago and that are not currently locked.
792-
793-
794-
[[redis-stream-inbound]]
795-
=== Redis Stream Inbound Channel Adapter
796-
797-
TBD
798-
799-
[[redis-stream-outbound]]
800-
=== Redis Stream Outbound Channel Adapter
801-
802-
TBD

0 commit comments

Comments
 (0)