@@ -5,20 +5,18 @@ import com.example.kotlin.chat.asRendered
55import com.example.kotlin.chat.mapToViewModel
66import com.example.kotlin.chat.repository.MessageRepository
77import kotlinx.coroutines.ExperimentalCoroutinesApi
8- import kotlinx.coroutines.channels.BroadcastChannel
9- import kotlinx.coroutines.channels.Channel
108import kotlinx.coroutines.flow.Flow
9+ import kotlinx.coroutines.flow.MutableSharedFlow
1110import kotlinx.coroutines.flow.map
1211import kotlinx.coroutines.flow.onEach
13- import kotlinx.coroutines.flow.receiveAsFlow
1412import kotlinx.coroutines.flow.collect
1513import org.springframework.stereotype.Service
1614
1715@Service
1816@ExperimentalCoroutinesApi
1917class PersistentMessageService (val messageRepository : MessageRepository ) : MessageService {
2018
21- val sender: BroadcastChannel <MessageVM > = BroadcastChannel ( Channel . BUFFERED )
19+ val sender: MutableSharedFlow <MessageVM > = MutableSharedFlow ( )
2220
2321 override fun latest (): Flow <MessageVM > =
2422 messageRepository.findLatest()
@@ -28,12 +26,12 @@ class PersistentMessageService(val messageRepository: MessageRepository) : Messa
2826 messageRepository.findLatest(messageId)
2927 .mapToViewModel()
3028
31- override fun stream (): Flow <MessageVM > = sender.openSubscription().receiveAsFlow()
29+ override fun stream (): Flow <MessageVM > = sender
3230
3331 override suspend fun post (messages : Flow <MessageVM >) =
3432 messages
35- .onEach { sender.send (it.asRendered()) }
36- .map { it.asDomainObject() }
33+ .onEach { sender.emit (it.asRendered()) }
34+ .map { it.asDomainObject() }
3735 .let { messageRepository.saveAll(it) }
3836 .collect()
3937}
0 commit comments