Skip to content

Commit 2dc8702

Browse files
committed
Add Docs for R2DBC Channel adapters
1 parent 51ebf40 commit 2dc8702

File tree

1 file changed

+88
-2
lines changed

1 file changed

+88
-2
lines changed

src/reference/asciidoc/r2dbc.adoc

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,95 @@ compile "org.springframework.integration:spring-integration-r2dbc:{project-versi
2626
[[r2dbc-inbound-channel-adapter]]
2727
=== R2DBC Inbound Channel Adapter
2828

29-
TBD
29+
The `R2dbcMessageSource` is a pollable `MessageSource` implementation based on the `R2dbcEntityOperations` and produces messages with a `Flux` or `Mono` as a payload for data fetched from database according an `expectSingleResult` option.
30+
The query to `SELECT` can be statically provided or based on a SpEL expression which is evaluated on every `receive()` call.
31+
The `R2dbcMessageSource.SelectCreator` is present as a root object for evaluation context to allow to use a `StatementMapper.SelectSpec` fluent API.
32+
By default this channel adapter maps records from the select into a `LinkedCaseInsensitiveMap` instances.
33+
It can be customized providing a `payloadType` options which is used underneath by the `EntityRowMapper` based on the `this.r2dbcEntityOperations.getConverter()`.
34+
The `updateSql` is optional and used to mark read records in the databased for skipping from the subsequent polls.
35+
The `UPDATE` operation can be supplied with a `BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>` to bind values into an `UPDATE` based on records in the `SELECT` result.
36+
37+
A typical configuration for this channel adapter might look like this:
38+
39+
====
40+
[source, java]
41+
----
42+
@Bean
43+
@InboundChannelAdapter("fromR2dbcChannel")
44+
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
45+
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
46+
"SELECT * FROM person WHERE name='Name'");
47+
r2dbcMessageSource.setPayloadType(Person.class);
48+
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
49+
r2dbcMessageSource.setBindFunction(
50+
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
51+
return r2dbcMessageSource;
52+
}
53+
----
54+
====
55+
56+
With Java DSL a configuration for this channel adapter is like this:
57+
58+
====
59+
[source, java]
60+
----
61+
@Bean
62+
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
63+
return IntegrationFlows
64+
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
65+
(selectCreator) ->
66+
selectCreator.createSelect("person")
67+
.withProjection("*")
68+
.withCriteria(Criteria.where("id").is(1)))
69+
.expectSingleResult(true)
70+
.payloadType(Person.class)
71+
.updateSql("UPDATE Person SET id='2' where id = :id")
72+
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
73+
bindSpec.bind("id", o.getId())),
74+
e -> e.poller(p -> p.fixedDelay(100)))
75+
.<Mono<?>>handle((p, h) -> p, e -> e.async(true))
76+
.channel(MessageChannels.flux())
77+
.get();
78+
}
79+
----
80+
====
3081

3182
[[r2dbc-outbound-channel-adapter]]
3283
=== R2DBC Outbound Channel Adapter
3384

34-
TBD
85+
The `R2dbcMessageHandler` is a `ReactiveMessageHandler` implementation to perform an `INSERT` (default), `UPDATE` or `DELETE` query in database using a provided `R2dbcEntityOperations`.
86+
The `R2dbcMessageHandler.Type` can be configured statically or via a SpEL expression against request message.
87+
The query to execute can be based on the `tableName`, `values` and `criteria` expression options or (if `tableName` is not provided) the whole message payload is treated as an `org.springframework.data.relational.core.mapping.Table` entity to perform SQL against.
88+
The package `org.springframework.data.relational.core.query` is registered as an import into a SpEL evaluation context for direct access to the `Criteria` fluent API which is used for `UPDATE` and `DELETE` queries.
89+
The `valuesExpression` is used in the `INSERT` and `UPDATE` and must be evaluated to the `Map` for column-value pairs to perform a change in the target table against request message.
90+
91+
A typical configuration for this channel adapter might look like this:
92+
93+
====
94+
[source, java]
95+
----
96+
@Bean
97+
@ServiceActivator(inputChannel = "toR2dbcChannel")
98+
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
99+
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
100+
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
101+
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
102+
messageHandler.setCriteriaExpression(
103+
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
104+
return messageHandler;
105+
}
106+
----
107+
====
108+
109+
With Java DSL a configuration for this channel adapter is like this:
110+
111+
====
112+
[source, java]
113+
----
114+
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
115+
.queryType(R2dbcMessageHandler.Type.UPDATE)
116+
.tableNameExpression("payload.class.simpleName")
117+
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
118+
.values("{age:36}"))
119+
----
120+
====

0 commit comments

Comments
 (0)