Skip to content

Commit 39234ae

Browse files
irfanh94Irfan Hodzic
andauthored
use for await instead of stream.on (#360)
* use for await instead of stream.on * added a coment and consumer.close on test complete * correct tests * increasing version to 0.7.7 --------- Co-authored-by: Irfan Hodzic <irfan.hodzic@Irfans-MacBook-Pro-2.local>
1 parent 3f9efc3 commit 39234ae

File tree

3 files changed

+270
-7
lines changed

3 files changed

+270
-7
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -211,17 +211,28 @@ export abstract class AbstractKafkaConsumer<
211211
)
212212
this.messageBatchStream.on('error', (error) => this.handlerError(error))
213213
} else {
214-
this.consumerStream.on('data', (message) =>
215-
this.consume(
216-
message.topic,
217-
message as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
218-
),
219-
)
214+
// biome-ignore lint/style/noNonNullAssertion: consumerStream is always created
215+
const stream = this.consumerStream!
216+
217+
// we are not waiting for the stream to complete
218+
// because init() must return promised void
219+
this.handleSyncStream(stream).catch(this.handlerError)
220220
}
221221

222222
this.consumerStream.on('error', (error) => this.handlerError(error))
223223
}
224224

225+
private async handleSyncStream(
226+
stream: MessagesStream<string, object, string, string>,
227+
): Promise<void> {
228+
for await (const message of stream) {
229+
await this.consume(
230+
message.topic,
231+
message as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
232+
)
233+
}
234+
}
235+
225236
async close(): Promise<void> {
226237
if (!this.consumerStream && !this.messageBatchStream) {
227238
// Leaving the group in case consumer joined but streams were not created

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.7.6",
3+
"version": "0.7.7",
44
"engines": {
55
"node": ">= 22.14.0"
66
},

packages/kafka/test/consumer/PermissionConsumer.spec.ts

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { KafkaHandlerConfig, type RequestContext } from '../../lib/index.ts'
77
import { PermissionPublisher } from '../publisher/PermissionPublisher.ts'
88
import {
99
PERMISSION_ADDED_SCHEMA,
10+
PERMISSION_REMOVED_SCHEMA,
1011
type PermissionAdded,
1112
TOPICS,
1213
} from '../utils/permissionSchemas.ts'
@@ -489,4 +490,255 @@ describe('PermissionConsumer', () => {
489490
})
490491
})
491492
})
493+
494+
describe('sync message processing', () => {
495+
let publisher: PermissionPublisher
496+
let consumer: PermissionConsumer | undefined
497+
498+
beforeAll(() => {
499+
publisher = new PermissionPublisher(testContext.cradle)
500+
})
501+
502+
beforeEach(async () => {
503+
// Close and clear previous consumer to avoid message accumulation
504+
if (consumer) {
505+
await consumer.close()
506+
consumer.clear()
507+
}
508+
})
509+
510+
afterAll(async () => {
511+
await publisher.close()
512+
await consumer?.close()
513+
})
514+
515+
it('should process messages one at a time using handleSyncStream', async () => {
516+
// Given - track processing order and timing
517+
const processingOrder: string[] = []
518+
const processingTimestamps: Record<string, { start: number; end: number }> = {}
519+
const testMessageIds = ['sync-1', 'sync-2', 'sync-3']
520+
521+
consumer = new PermissionConsumer(testContext.cradle, {
522+
handlers: {
523+
'permission-added': new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, async (message) => {
524+
// Only track messages from this test
525+
if (!testMessageIds.includes(message.value.id)) {
526+
consumer!.addedMessages.push(message)
527+
return
528+
}
529+
530+
const messageId = message.value.id
531+
processingOrder.push(`start-${messageId}`)
532+
processingTimestamps[messageId] = { start: Date.now(), end: 0 }
533+
534+
// Simulate async work to verify sequential processing
535+
await new Promise((resolve) => setTimeout(resolve, 50))
536+
537+
processingOrder.push(`end-${messageId}`)
538+
processingTimestamps[messageId]!.end = Date.now()
539+
consumer!.addedMessages.push(message)
540+
}),
541+
},
542+
})
543+
544+
await consumer.init()
545+
546+
// When - publish multiple messages at once
547+
await Promise.all([
548+
publisher.publish('permission-added', { id: 'sync-1', type: 'added', permissions: [] }),
549+
publisher.publish('permission-added', { id: 'sync-2', type: 'added', permissions: [] }),
550+
publisher.publish('permission-added', { id: 'sync-3', type: 'added', permissions: [] }),
551+
])
552+
553+
// Then - wait for all messages to be processed
554+
await consumer.handlerSpy.waitForMessageWithId('sync-1', 'consumed')
555+
await consumer.handlerSpy.waitForMessageWithId('sync-2', 'consumed')
556+
await consumer.handlerSpy.waitForMessageWithId('sync-3', 'consumed')
557+
558+
// Verify messages were processed sequentially (one completes before next starts)
559+
expect(processingOrder).toEqual([
560+
'start-sync-1',
561+
'end-sync-1',
562+
'start-sync-2',
563+
'end-sync-2',
564+
'start-sync-3',
565+
'end-sync-3',
566+
])
567+
568+
// Verify each message completes before the next one starts
569+
expect(processingTimestamps['sync-1']!.end).toBeLessThan(
570+
processingTimestamps['sync-2']!.start,
571+
)
572+
expect(processingTimestamps['sync-2']!.end).toBeLessThan(
573+
processingTimestamps['sync-3']!.start,
574+
)
575+
576+
const testMessages = consumer.addedMessages.filter((m) => testMessageIds.includes(m.value.id))
577+
expect(testMessages).toHaveLength(3)
578+
expect(testMessages[0]!.value.id).toBe('sync-1')
579+
expect(testMessages[1]!.value.id).toBe('sync-2')
580+
expect(testMessages[2]!.value.id).toBe('sync-3')
581+
})
582+
583+
it('should process messages in order even when published rapidly', async () => {
584+
// Given
585+
const testMessageIds = ['rapid-1', 'rapid-2', 'rapid-3', 'rapid-4', 'rapid-5']
586+
consumer = new PermissionConsumer(testContext.cradle)
587+
await consumer.init()
588+
589+
// When - publish messages rapidly without waiting
590+
const publishPromises = []
591+
for (let i = 1; i <= 5; i++) {
592+
publishPromises.push(
593+
publisher.publish('permission-added', {
594+
id: `rapid-${i}`,
595+
type: 'added',
596+
permissions: [],
597+
}),
598+
)
599+
}
600+
await Promise.all(publishPromises)
601+
602+
// Then - wait for all messages to be processed
603+
for (let i = 1; i <= 5; i++) {
604+
await consumer.handlerSpy.waitForMessageWithId(`rapid-${i}`, 'consumed')
605+
}
606+
607+
// Verify messages were processed in order
608+
const testMessages = consumer.addedMessages.filter((m) => testMessageIds.includes(m.value.id))
609+
expect(testMessages).toHaveLength(5)
610+
for (let i = 0; i < 5; i++) {
611+
expect(testMessages[i]!.value.id).toBe(`rapid-${i + 1}`)
612+
}
613+
})
614+
615+
it('should ensure previous message completes before next message starts processing', async () => {
616+
// Given - use a handler that takes time and tracks concurrency
617+
let concurrentProcessing = 0
618+
let maxConcurrency = 0
619+
const testMessageIds = ['concurrency-1', 'concurrency-2', 'concurrency-3']
620+
const processedMessages: string[] = []
621+
622+
consumer = new PermissionConsumer(testContext.cradle, {
623+
handlers: {
624+
'permission-added': new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, async (message) => {
625+
// Only track messages from this test
626+
if (!testMessageIds.includes(message.value.id)) {
627+
consumer!.addedMessages.push(message)
628+
return
629+
}
630+
631+
concurrentProcessing++
632+
maxConcurrency = Math.max(maxConcurrency, concurrentProcessing)
633+
634+
// Simulate processing time
635+
await new Promise((resolve) => setTimeout(resolve, 30))
636+
637+
concurrentProcessing--
638+
processedMessages.push(message.value.id)
639+
consumer!.addedMessages.push(message)
640+
}),
641+
},
642+
})
643+
await consumer.init()
644+
645+
// When - publish multiple messages
646+
await Promise.all([
647+
publisher.publish('permission-added', {
648+
id: 'concurrency-1',
649+
type: 'added',
650+
permissions: [],
651+
}),
652+
publisher.publish('permission-added', {
653+
id: 'concurrency-2',
654+
type: 'added',
655+
permissions: [],
656+
}),
657+
publisher.publish('permission-added', {
658+
id: 'concurrency-3',
659+
type: 'added',
660+
permissions: [],
661+
}),
662+
])
663+
664+
// Then - wait for all messages
665+
await consumer.handlerSpy.waitForMessageWithId('concurrency-1', 'consumed')
666+
await consumer.handlerSpy.waitForMessageWithId('concurrency-2', 'consumed')
667+
await consumer.handlerSpy.waitForMessageWithId('concurrency-3', 'consumed')
668+
669+
// Verify only one message was processed at a time (max concurrency = 1)
670+
expect(maxConcurrency).toBe(1)
671+
expect(processedMessages).toHaveLength(3)
672+
expect(processedMessages).toContain('concurrency-1')
673+
expect(processedMessages).toContain('concurrency-2')
674+
expect(processedMessages).toContain('concurrency-3')
675+
})
676+
677+
it('should process messages synchronously across different topics', async () => {
678+
// Given
679+
const processingOrder: string[] = []
680+
const testMessageIds = ['cross-topic-1', 'cross-topic-2', 'cross-topic-3']
681+
682+
consumer = new PermissionConsumer(testContext.cradle, {
683+
handlers: {
684+
'permission-added': new KafkaHandlerConfig(PERMISSION_ADDED_SCHEMA, async (message) => {
685+
// Only track messages from this test
686+
if (!testMessageIds.includes(message.value.id)) {
687+
consumer!.addedMessages.push(message)
688+
return
689+
}
690+
processingOrder.push(`added-${message.value.id}`)
691+
await new Promise((resolve) => setTimeout(resolve, 20))
692+
consumer!.addedMessages.push(message)
693+
}),
694+
'permission-removed': new KafkaHandlerConfig(
695+
PERMISSION_REMOVED_SCHEMA,
696+
async (message) => {
697+
// Only track messages from this test
698+
if (!testMessageIds.includes(message.value.id)) {
699+
consumer!.removedMessages.push(message)
700+
return
701+
}
702+
processingOrder.push(`removed-${message.value.id}`)
703+
await new Promise((resolve) => setTimeout(resolve, 20))
704+
consumer!.removedMessages.push(message)
705+
},
706+
),
707+
},
708+
})
709+
await consumer.init()
710+
711+
// When - publish messages to different topics
712+
await Promise.all([
713+
publisher.publish('permission-added', {
714+
id: 'cross-topic-1',
715+
type: 'added',
716+
permissions: [],
717+
}),
718+
publisher.publish('permission-removed', {
719+
id: 'cross-topic-2',
720+
type: 'removed',
721+
permissions: [],
722+
}),
723+
publisher.publish('permission-added', {
724+
id: 'cross-topic-3',
725+
type: 'added',
726+
permissions: [],
727+
}),
728+
])
729+
730+
// Then - wait for all messages
731+
await consumer.handlerSpy.waitForMessageWithId('cross-topic-1', 'consumed')
732+
await consumer.handlerSpy.waitForMessageWithId('cross-topic-2', 'consumed')
733+
await consumer.handlerSpy.waitForMessageWithId('cross-topic-3', 'consumed')
734+
735+
// Verify messages were processed sequentially (one at a time)
736+
// Note: The exact order depends on Kafka's partition assignment, but each should complete before next starts
737+
expect(processingOrder.length).toBe(3)
738+
const testMessages =
739+
consumer.addedMessages.filter((m) => testMessageIds.includes(m.value.id)).length +
740+
consumer.removedMessages.filter((m) => testMessageIds.includes(m.value.id)).length
741+
expect(testMessages).toBe(3)
742+
})
743+
})
492744
})

0 commit comments

Comments
 (0)