diff --git a/rating-service/pom.xml b/rating-service/pom.xml
index 25862a6..3887788 100644
--- a/rating-service/pom.xml
+++ b/rating-service/pom.xml
@@ -47,6 +47,11 @@
lombok
1.16.16
+
+ org.springframework.kafka
+ spring-kafka
+ 1.2.2.RELEASE
+
org.springframework.boot
spring-boot-starter-test
diff --git a/rating-service/src/main/java/com/lohika/jclub/rating/service/ApartmentRecordListener.java b/rating-service/src/main/java/com/lohika/jclub/rating/service/ApartmentRecordListener.java
new file mode 100644
index 0000000..f1bdafb
--- /dev/null
+++ b/rating-service/src/main/java/com/lohika/jclub/rating/service/ApartmentRecordListener.java
@@ -0,0 +1,35 @@
+package com.lohika.jclub.rating.service;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+
+/**
+ * @author Andriy Levchenko
+ */
+@Slf4j
+@Component
+@EnableKafka
+public class ApartmentRecordListener {
+
+ @Autowired
+ private RatingService ratingService;
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @KafkaListener(topics = "${kafka.topic.boot}")
+ public void receive(ConsumerRecord record) throws IOException {
+ log.info("Received message: " + record);
+ Apartment apartment = objectMapper.readValue(record.value(), Apartment.class);
+ BigDecimal rating = ratingService.calculateRating(apartment);
+ log.info("Rating : {}", rating);
+ }
+}
diff --git a/rating-service/src/main/resources/application.yml b/rating-service/src/main/resources/application.yml
new file mode 100644
index 0000000..f8f441c
--- /dev/null
+++ b/rating-service/src/main/resources/application.yml
@@ -0,0 +1,9 @@
+spring:
+ kafka:
+ consumer:
+ auto-offset-reset: earliest
+ group-id: boot
+
+kafka:
+ topic:
+ boot: apartments
\ No newline at end of file
diff --git a/storage-service/pom.xml b/storage-service/pom.xml
index 230df06..059cf79 100644
--- a/storage-service/pom.xml
+++ b/storage-service/pom.xml
@@ -55,6 +55,11 @@
spring-boot-starter-test
test
+
+ org.springframework.kafka
+ spring-kafka
+ 1.2.2.RELEASE
+
diff --git a/storage-service/src/main/java/com/lohika/jclub/storage/service/ApartmentRepositoryEventListener.java b/storage-service/src/main/java/com/lohika/jclub/storage/service/ApartmentRepositoryEventListener.java
new file mode 100644
index 0000000..b8e8677
--- /dev/null
+++ b/storage-service/src/main/java/com/lohika/jclub/storage/service/ApartmentRepositoryEventListener.java
@@ -0,0 +1,36 @@
+package com.lohika.jclub.storage.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.rest.core.event.AbstractRepositoryEventListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author Andriy Levchenko
+ */
+@Slf4j
+@Component
+public class ApartmentRepositoryEventListener extends AbstractRepositoryEventListener {
+
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ protected void onAfterCreate(ApartmentRecord apartmentRecord) {
+ log.info("Creating message for apartmentRecord " + apartmentRecord.toString());
+ String json;
+ try {
+ json = objectMapper.writeValueAsString(apartmentRecord);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Error", e);
+ }
+ kafkaTemplate.send("apartments", json);
+ log.info("Message sent for apartmentRecord " + apartmentRecord.toString());
+ }
+}