@@ -10,37 +10,38 @@ package infrastructure.events
1010
1111import application.controller.MedicalTechnologyController
1212import application.controller.RoomController
13- import application.presenter.event.model.MedicalTechnologyEvent
14- import application.presenter.event.model.MedicalTechnologyEventKey
15- import application.presenter.event.model.RoomEvent
16- import application.presenter.event.model.RoomEventKey
17- import application.presenter.event.model.RoomEventPayloads
18- import application.presenter.event.serialization.EventSerialization.toHumidity
19- import application.presenter.event.serialization.EventSerialization.toLuminosity
20- import application.presenter.event.serialization.EventSerialization.toPresence
21- import application.presenter.event.serialization.EventSerialization.toTemperature
22- import application.service.MedicalTechnologyService
23- import application.service.RoomService
13+ import application.handler.EventHandler
14+ import application.handler.EventHandlers
15+ import application.presenter.event.serialization.EventSerialization.toEvent
2416import com.fasterxml.jackson.databind.ObjectMapper
25- import entity.medicaltechnology.MedicalTechnologyID
26- import entity.zone.RoomEnvironmentalData
27- import entity.zone.RoomID
2817import infrastructure.provider.ManagerProvider
29- import kotlinx.serialization.decodeFromString
30- import kotlinx.serialization.json.Json
18+ import org.apache.kafka.clients.consumer.ConsumerRecord
3119import org.apache.kafka.clients.consumer.KafkaConsumer
3220import java.time.Duration
33- import java.time.Instant
3421import java.time.format.DateTimeParseException
3522
3623/* *
3724 * This class manage the Kafka client needed to consume events.
3825 * @param[provider] the provider of managers.
3926 */
4027class KafkaClient (private val provider : ManagerProvider ) {
28+ private val eventHandlers: List <EventHandler >
29+
4130 init {
4231 checkNotNull(System .getenv(bootstrapServerUrlVariable)) { " kafka bootstrap server url required" }
4332 checkNotNull(System .getenv(schemaRegistryUrlVariable)) { " kafka schema registry url required" }
33+ val roomController = RoomController (provider.roomDigitalTwinManager, provider.roomDatabaseManager)
34+ val medicalTechnologyController = MedicalTechnologyController (
35+ provider.medicalTechnologyDigitalTwinManager,
36+ provider.medicalTechnologyDatabaseManager
37+ )
38+ eventHandlers = listOf (
39+ EventHandlers .TemperatureEventHandler (roomController),
40+ EventHandlers .HumidityEventHandler (roomController),
41+ EventHandlers .LuminosityEventHandler (roomController),
42+ EventHandlers .PresenceEventHandler (roomController),
43+ EventHandlers .MedicalTechnologyEventHandler (medicalTechnologyController)
44+ )
4445 }
4546
4647 private val kafkaConsumer = KafkaConsumer <String , String >(
@@ -58,7 +59,7 @@ class KafkaClient(private val provider: ManagerProvider) {
5859 while (true ) {
5960 kafkaConsumer.poll(Duration .ofMillis(pollingTime)).forEach { event ->
6061 try {
61- consumeEvent(event.key(), ObjectMapper ().writeValueAsString(event.value()) )
62+ consumeEvent(event)
6263 } catch (e: IllegalArgumentException ) {
6364 println (" ERROR: Invalid Event Schema. Event discarded! - $e " )
6465 } catch (e: DateTimeParseException ) {
@@ -69,58 +70,11 @@ class KafkaClient(private val provider: ManagerProvider) {
6970 }
7071 }
7172
72- private fun consumeEvent (eventKey : String , event : String ) {
73- fun updateRoomEnvironmentData (event : RoomEvent <* >, environmentalData : RoomEnvironmentalData ) =
74- RoomService .UpdateRoomEnvironmentData (
75- RoomID (event.roomId),
76- environmentalData,
77- Instant .parse(event.dateTime),
78- RoomController (provider.roomDigitalTwinManager, provider.roomDatabaseManager)
79- ).execute()
80-
81- when (eventKey) {
82- RoomEventKey .TEMPERATURE_EVENT -> {
83- val deserializedEvent = Json .decodeFromString<RoomEvent <RoomEventPayloads .TemperaturePayload >>(event)
84- updateRoomEnvironmentData(
85- deserializedEvent,
86- RoomEnvironmentalData (temperature = deserializedEvent.data.toTemperature())
87- )
88- }
89- RoomEventKey .HUMIDITY_EVENT -> {
90- val deserializedEvent = Json .decodeFromString<RoomEvent <RoomEventPayloads .HumidityPayload >>(event)
91- updateRoomEnvironmentData(
92- deserializedEvent,
93- RoomEnvironmentalData (humidity = deserializedEvent.data.toHumidity())
94- )
95- }
96- RoomEventKey .LUMINOSITY_EVENT -> {
97- val deserializedEvent = Json .decodeFromString<RoomEvent <RoomEventPayloads .LuminosityPayload >>(event)
98- updateRoomEnvironmentData(
99- deserializedEvent,
100- RoomEnvironmentalData (luminosity = deserializedEvent.data.toLuminosity())
101- )
102- }
103- RoomEventKey .PRESENCE_EVENT -> {
104- val deserializedEvent = Json .decodeFromString<RoomEvent <RoomEventPayloads .PresencePayload >>(event)
105- updateRoomEnvironmentData(
106- deserializedEvent,
107- RoomEnvironmentalData (presence = deserializedEvent.data.toPresence())
108- )
109- }
110- MedicalTechnologyEventKey .USAGE_EVENT -> {
111- val deserializedEvent = Json .decodeFromString<MedicalTechnologyEvent >(event)
112- MedicalTechnologyService .UpdateMedicalTechnologyUsage (
113- MedicalTechnologyID (deserializedEvent.data.medicalTechnologyID),
114- deserializedEvent.data.isInUse,
115- Instant .parse(deserializedEvent.dateTime),
116- MedicalTechnologyController (
117- provider.medicalTechnologyDigitalTwinManager,
118- provider.medicalTechnologyDatabaseManager
119- )
120- )
121- }
122- else -> throw IllegalArgumentException (" Event not supported" )
123- }
73+ private fun consumeEvent (event : ConsumerRecord <String , String >) {
74+ val deserializedEvent = ObjectMapper ().writeValueAsString(event.value()).toEvent(event.key())
75+ this @KafkaClient.eventHandlers
76+ .filter { it.canHandle(deserializedEvent) }
77+ .forEach { it.consume(deserializedEvent) }
12478 }
12579
12680 companion object {
0 commit comments