Skip to content

Commit 6b26ecd

Browse files
committed
Refactor Kafka consumer code and add actuator endpoint
This commit refactors the code in the Kafka consumer module and adds an actuator endpoint. The changes include: - Importing the required dependencies for the actuator and express - Subscribing to the Kafka topic "delete-child" from the beginning - Parsing and processing the received Kafka messages based on the resource type - Adding a new actuator endpoint at the root ("/") to welcome users
1 parent 3f986eb commit 6b26ecd

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

apps/kafka-consumer/src/index.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
const { Kafka, logLevel } = require("kafkajs");
22
const { postDeleteProject } = require("./service/delete-child");
3+
const actuator = require("express-actuator");
4+
const express = require("express");
5+
36
import { KafkaMessage } from "@repo/backend/lib/types";
47
import { Response } from "express";
5-
const actuator = require('express-actuator');
6-
const express = require("express");
78

89
const kafka = new Kafka({
910
brokers: [process.env.KAFKA_BROKER!],
@@ -23,7 +24,10 @@ const run = async () => {
2324
await consumer.connect().then(() => console.log("Connected"));
2425

2526
await consumer
26-
.subscribe({ topic: process.env.KAFKA_TOPIC || "delete-child", fromBeginning: true })
27+
.subscribe({
28+
topic: process.env.KAFKA_TOPIC || "delete-child",
29+
fromBeginning: true,
30+
})
2731
.then(() => console.log("Subscribed to topic"));
2832

2933
await consumer.run({
@@ -38,12 +42,13 @@ const run = async () => {
3842
if (!message.value?.keys) return;
3943

4044
const messageValue: KafkaMessage = JSON.parse(message.value.toString());
45+
const { id, resource } = messageValue;
4146

42-
console.log(messageValue.id);
47+
if (!id || !resource) return;
4348

44-
switch (messageValue.resource) {
49+
switch (resource) {
4550
case "project":
46-
postDeleteProject(messageValue.id);
51+
postDeleteProject(id);
4752
break;
4853

4954
default:
@@ -60,7 +65,7 @@ const port = process.env.PORT || 8080;
6065

6166
app.use(actuator());
6267

63-
app.get("/", (_, res:Response) => {
68+
app.get("/", (_: any, res: Response) => {
6469
res.send("Welcome to the Projectify Kaka consumer!");
6570
});
6671

0 commit comments

Comments
 (0)