Skip to content

Commit 6a3951f

Browse files
committed
Part5: RSocket
1 parent e4e2f03 commit 6a3951f

File tree

10 files changed

+239
-134
lines changed

10 files changed

+239
-134
lines changed

build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies {
2222
implementation("org.springframework.boot:spring-boot-starter-thymeleaf")
2323
implementation("org.springframework.boot:spring-boot-starter-webflux")
2424
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
25+
implementation("org.springframework.boot:spring-boot-starter-rsocket")
2526
implementation("io.r2dbc:r2dbc-h2")
2627

2728
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
@@ -34,6 +35,7 @@ dependencies {
3435
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
3536

3637
testImplementation("org.springframework.boot:spring-boot-starter-test")
38+
testImplementation("app.cash.turbine:turbine:0.3.0")
3739

3840
runtimeOnly("com.h2database:h2")
3941

src/main/kotlin/com/example/kotlin/chat/Extensions.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import com.example.kotlin.chat.repository.ContentType
44
import com.example.kotlin.chat.repository.Message
55
import com.example.kotlin.chat.service.MessageVM
66
import com.example.kotlin.chat.service.UserVM
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.map
79
import org.intellij.markdown.flavours.commonmark.CommonMarkFlavourDescriptor
810
import org.intellij.markdown.html.HtmlGenerator
911
import org.intellij.markdown.parser.MarkdownParser
@@ -24,6 +26,10 @@ fun Message.asViewModel(): MessageVM = MessageVM(
2426
sent,
2527
id
2628
)
29+
fun MessageVM.asRendered(contentType: ContentType = ContentType.MARKDOWN): MessageVM =
30+
this.copy(content = contentType.render(this.content))
31+
32+
fun Flow<Message>.mapToViewModel(): Flow<MessageVM> = map { it.asViewModel() }
2733

2834
fun List<Message>.mapToViewModel(): List<MessageVM> = map { it.asViewModel() }
2935

src/main/kotlin/com/example/kotlin/chat/controller/HtmlController.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,10 @@ import org.springframework.ui.set
99
import org.springframework.web.bind.annotation.GetMapping
1010

1111
@Controller
12-
class HtmlController(val messageService: MessageService) {
12+
class HtmlController() {
1313

1414
@GetMapping("/")
15-
suspend fun index(model: Model): String {
16-
val messages: List<MessageVM> = messageService.latest()
17-
18-
model["messages"] = messages
19-
model["lastMessageId"] = messages.lastOrNull()?.id ?: ""
20-
15+
fun index(): String {
2116
return "chat"
2217
}
2318
}

src/main/kotlin/com/example/kotlin/chat/controller/MessageResource.kt

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,23 @@ import com.example.kotlin.chat.service.MessageVM
55
import kotlinx.coroutines.flow.Flow
66
import kotlinx.coroutines.flow.emitAll
77
import kotlinx.coroutines.flow.onStart
8-
import org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE
9-
import org.springframework.http.ResponseEntity
10-
import org.springframework.web.bind.annotation.*
8+
import org.springframework.messaging.handler.annotation.MessageMapping
9+
import org.springframework.messaging.handler.annotation.Payload
10+
import org.springframework.stereotype.Controller
1111

12-
@RestController
13-
@RequestMapping("/api/v1/messages")
12+
@Controller
13+
@MessageMapping("api.v1.messages")
1414
class MessageResource(val messageService: MessageService) {
1515

16-
@GetMapping
17-
suspend fun latest(@RequestParam(value = "lastMessageId", defaultValue = "") lastMessageId: String): ResponseEntity<List<MessageVM>> {
18-
val messages = if (lastMessageId.isNotEmpty()) {
19-
messageService.after(lastMessageId)
20-
} else {
21-
messageService.latest()
22-
}
16+
@MessageMapping("stream")
17+
suspend fun receiveStream(@Payload inboundMessages: Flow<MessageVM>) =
18+
messageService.post(inboundMessages)
2319

24-
return if (messages.isEmpty()) {
25-
with(ResponseEntity.noContent()) {
26-
header("lastMessageId", lastMessageId)
27-
build<List<MessageVM>>()
28-
}
29-
} else {
30-
with(ResponseEntity.ok()) {
31-
header("lastMessageId", messages.last().id)
32-
body(messages)
33-
}
20+
@MessageMapping("stream")
21+
suspend fun sendStream(): Flow<MessageVM> = messageService
22+
.stream()
23+
.onStart {
24+
emitAll(messageService.latest())
3425
}
35-
}
3626

37-
@PostMapping
38-
suspend fun post(@RequestBody message: MessageVM) {
39-
messageService.post(message)
40-
}
4127
}

src/main/kotlin/com/example/kotlin/chat/repository/MessageRepository.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ interface MessageRepository : CoroutineCrudRepository<Message, String> {
1515
LIMIT 10
1616
) ORDER BY "SENT"
1717
""")
18-
suspend fun findLatest(): List<Message>
18+
fun findLatest(): Flow<Message>
1919

2020
// language=SQL
2121
@Query("""
@@ -25,5 +25,5 @@ interface MessageRepository : CoroutineCrudRepository<Message, String> {
2525
ORDER BY "SENT" DESC
2626
) ORDER BY "SENT"
2727
""")
28-
suspend fun findLatest(@Param("id") id: String): List<Message>
28+
fun findLatest(@Param("id") id: String): Flow<Message>
2929
}
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package com.example.kotlin.chat.service
22

3+
import kotlinx.coroutines.flow.Flow
4+
35
interface MessageService {
46

5-
suspend fun latest(): List<MessageVM>
7+
fun latest(): Flow<MessageVM>
8+
9+
fun after(messageId: String): Flow<MessageVM>
610

7-
suspend fun after(messageId: String): List<MessageVM>
11+
fun stream(): Flow<MessageVM>
812

9-
suspend fun post(message: MessageVM)
13+
suspend fun post(messages: Flow<MessageVM>)
1014
}
Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,39 @@
11
package com.example.kotlin.chat.service
22

33
import com.example.kotlin.chat.asDomainObject
4+
import com.example.kotlin.chat.asRendered
45
import com.example.kotlin.chat.mapToViewModel
56
import com.example.kotlin.chat.repository.MessageRepository
7+
import kotlinx.coroutines.ExperimentalCoroutinesApi
8+
import kotlinx.coroutines.channels.BroadcastChannel
9+
import kotlinx.coroutines.channels.Channel
10+
import kotlinx.coroutines.flow.Flow
11+
import kotlinx.coroutines.flow.map
12+
import kotlinx.coroutines.flow.onEach
13+
import kotlinx.coroutines.flow.receiveAsFlow
14+
import kotlinx.coroutines.flow.collect
615
import org.springframework.stereotype.Service
716

817
@Service
18+
@ExperimentalCoroutinesApi
919
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {
1020

11-
override suspend fun latest(): List<MessageVM> =
21+
val sender: BroadcastChannel<MessageVM> = BroadcastChannel(Channel.BUFFERED)
22+
23+
override fun latest(): Flow<MessageVM> =
1224
messageRepository.findLatest()
1325
.mapToViewModel()
1426

15-
override suspend fun after(messageId: String): List<MessageVM> =
27+
override fun after(messageId: String): Flow<MessageVM> =
1628
messageRepository.findLatest(messageId)
1729
.mapToViewModel()
1830

19-
override suspend fun post(message: MessageVM) {
20-
messageRepository.save(message.asDomainObject())
21-
}
31+
override fun stream(): Flow<MessageVM> = sender.openSubscription().receiveAsFlow()
32+
33+
override suspend fun post(messages: Flow<MessageVM>) =
34+
messages
35+
.onEach { sender.send(it.asRendered()) }
36+
.map { it.asDomainObject() }
37+
.let { messageRepository.saveAll(it) }
38+
.collect()
2239
}
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
1-
spring.r2dbc.url=r2dbc:h2:file:///./build/data/testdb;USER=sa;PASSWORD=password
1+
spring.r2dbc.url=r2dbc:h2:file:///./build/data/testdb;USER=sa;PASSWORD=password
2+
3+
spring.rsocket.server.transport=websocket
4+
spring.rsocket.server.mapping-path=/rsocket

src/main/resources/templates/chat.html

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323
crossorigin="anonymous"></script>
2424
<script src='https://cdn.jsdelivr.net/lodash/4.17.4/lodash.min.js'></script>
2525
<script src='https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.18.1/moment-with-locales.min.js'></script>
26+
27+
<script src='/rsocket-types.js'></script>
28+
<script src='/rsocket-flowable.js'></script>
29+
<script src='/rsocket-core.js'></script>
30+
<script src='/rsocket-websocket-client.js'></script>
31+
2632
<style>
2733
.chats-row div {
2834
height: 50%;
@@ -134,33 +140,74 @@
134140
var messageButton = $('.chat-input button');
135141
var messageInput = $('.chat-input textarea');
136142

137-
messageButton.on("click", function() {
138-
var content = messageInput.val();
139-
messageInput.val("");
140-
if (content) {
141-
$.post({
142-
url: "/api/v1/messages",
143-
data: JSON.stringify({content: content , user: user, sent: new Date().toISOString()}),
144-
contentType: 'application/json'
145-
});
146-
}
147-
})
148-
149143
chatArea.animate({scrollTop: mediaList.height()}, 1);
150-
setInterval(function () {
151-
$.get("/api/v1/messages", {lastMessageId: lastMessageId}).done(function (data, status, request) {
152-
if (data) {
153-
data.forEach(function (v) {
154-
mediaList.append(messageTemplate(v));
144+
145+
const client = new rsocketCore.RSocketClient({
146+
transport: new rsocketWebSocketClient(
147+
{
148+
url: 'ws://localhost:8080/rsocket',
149+
},
150+
rsocketCore.BufferEncoders,
151+
),
152+
setup: {
153+
dataMimeType: 'application/json',
154+
metadataMimeType: rsocketCore.MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
155+
keepAlive: 5000,
156+
lifetime: 60000,
157+
},
158+
});
159+
160+
client.connect()
161+
.then(rsocket => {
162+
var endpoint = "api.v1.messages.stream";
163+
164+
rsocket.requestChannel(new rsocketFlowable.Flowable(source => {
165+
console.log("channel")
166+
source.onSubscribe({
167+
cancel: () => {},
168+
request: (n) => {}
169+
})
170+
messageButton.on("click", function() {
171+
var content = messageInput.val();
172+
messageInput.val("");
173+
if (content) {
174+
source.onNext({
175+
data: Buffer.from(JSON.stringify({content: content , user: user, sent: new Date().toISOString()})),
176+
metadata: rsocketCore.encodeAndAddWellKnownMetadata(
177+
Buffer.alloc(0),
178+
rsocketCore.MESSAGE_RSOCKET_ROUTING,
179+
Buffer.from(String.fromCharCode(endpoint.length) + endpoint)
180+
)
181+
});
182+
}
183+
});
184+
}))
185+
.subscribe({
186+
onSubscribe: (s) => {
187+
s.request(1000)
188+
}
155189
});
156190

157-
chatArea.stop();
158-
chatArea.animate({scrollTop: mediaList.height()}, 500);
159-
}
191+
rsocket.requestStream({
192+
metadata: rsocketCore.encodeAndAddWellKnownMetadata(
193+
Buffer.alloc(0),
194+
rsocketCore.MESSAGE_RSOCKET_ROUTING,
195+
Buffer.from(String.fromCharCode(endpoint.length) + endpoint)
196+
)
197+
})
198+
.subscribe({
199+
onSubscribe: (s) => {
200+
s.request(1000)
201+
},
202+
onNext: (e) => {
203+
var v = JSON.parse(e.data);
160204

161-
lastMessageId = request.getResponseHeader('lastMessageId');
205+
mediaList.append(messageTemplate(v));
206+
chatArea.stop();
207+
chatArea.animate({scrollTop: mediaList.height()}, 500);
208+
}
209+
});
162210
});
163-
}, 1000);
164211

165212
/*]]>*/
166213
</script>

0 commit comments

Comments
 (0)