Skip to content

Commit 7d90909

Browse files
authored
Add FIFO support to SQS (#352)
1 parent 5df8b1f commit 7d90909

26 files changed

+4173
-191
lines changed

biome.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,12 @@
88
],
99
"files": {
1010
"includes": ["**", "!.turbo", "!**/dist", "!**/coverage"]
11+
},
12+
"linter": {
13+
"rules": {
14+
"correctness": {
15+
"noUnusedPrivateClassMembers": "off"
16+
}
1117
}
18+
}
1219
}

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ services:
1010
restart: on-failure
1111

1212
localstack:
13-
image: localstack/localstack:4.0.2
13+
image: localstack/localstack:4.10.0
1414
network_mode: bridge
1515
hostname: localstack
1616
ports:

packages/core/lib/queues/MessageSchemaContainer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const DEFAULT_SCHEMA_KEY = Symbol('NO_MESSAGE_TYPE')
1212

1313
export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
1414
public readonly messageDefinitions: Record<string | symbol, CommonEventDefinition>
15+
1516
private readonly messageSchemas: Record<string | symbol, ZodSchema<MessagePayloadSchemas>>
1617
private readonly messageTypeField?: string
1718

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"zod": ">=3.25.76 <5.0.0"
3838
},
3939
"devDependencies": {
40-
"@biomejs/biome": "^2.3.2",
40+
"@biomejs/biome": "^2.3.6",
4141
"@lokalise/biome-config": "^3.1.0",
4242
"@lokalise/tsconfig": "^3.0.0",
4343
"@types/node": "^24.0.3",

packages/core/test/testContext.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import type {
2-
CommonLogger,
3-
ErrorReporter,
4-
TransactionObservabilityManager,
1+
import {
2+
type CommonLogger,
3+
type ErrorReporter,
4+
globalLogger,
5+
type TransactionObservabilityManager,
56
} from '@lokalise/node-core'
67
import { enrichMessageSchemaWithBase } from '@message-queue-toolkit/schemas'
78
import { asClass, asFunction, createContainer, Lifetime, type Resolver } from 'awilix'
89
import { AwilixManager } from 'awilix-manager'
9-
import pino from 'pino'
1010
import { z } from 'zod/v4'
1111
import { DomainEventEmitter } from '../lib/events/DomainEventEmitter.ts'
1212
import { EventRegistry } from '../lib/events/EventRegistry.ts'
@@ -19,7 +19,7 @@ export const SINGLETON_CONFIG = { lifetime: Lifetime.SINGLETON }
1919

2020
export type DependencyOverrides = Partial<DiConfig>
2121

22-
const TestLogger: CommonLogger = pino()
22+
const TestLogger: CommonLogger = globalLogger
2323

2424
export const TestEvents = {
2525
created: {

packages/kafka/lib/AbstractKafkaPublisher.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ export type KafkaMessageOptions = Omit<
3030
export abstract class AbstractKafkaPublisher<
3131
TopicsConfig extends TopicConfig[],
3232
> extends AbstractKafkaService<TopicsConfig, KafkaPublisherOptions<TopicsConfig>> {
33-
private readonly topicsConfig: TopicsConfig
3433
private readonly schemaContainers: Record<string, MessageSchemaContainer<object>>
3534

3635
private readonly producer: Producer<string, object, string, string>
@@ -40,11 +39,11 @@ export abstract class AbstractKafkaPublisher<
4039
super(dependencies, options)
4140
this.isInitiated = false
4241

43-
this.topicsConfig = options.topicsConfig
44-
if (this.topicsConfig.length === 0) throw new Error('At least one topic must be defined')
42+
const topicsConfig = options.topicsConfig
43+
if (topicsConfig.length === 0) throw new Error('At least one topic must be defined')
4544

4645
this.schemaContainers = {}
47-
for (const { topic, schema } of this.topicsConfig) {
46+
for (const { topic, schema } of topicsConfig) {
4847
this.schemaContainers[topic] = new MessageSchemaContainer({
4948
messageSchemas: [schema],
5049
messageDefinitions: [],

packages/kafka/test/publisher/PermissionPublisher.spec.ts

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -104,27 +104,27 @@ describe('PermissionPublisher', () => {
104104
expect(error.cause.errors[0].errors[0].apiId).toBe('UNKNOWN_TOPIC_OR_PARTITION')
105105
})
106106

107-
it.each([false, true])(
108-
'should auto create topic if creation topic is used (lazy init: %s)',
109-
async (lazyInit) => {
110-
// Given
111-
publisher = new PermissionPublisher(testContext.cradle, {
112-
autocreateTopics: true,
113-
})
107+
it.each([
108+
false,
109+
true,
110+
])('should auto create topic if creation topic is used (lazy init: %s)', async (lazyInit) => {
111+
// Given
112+
publisher = new PermissionPublisher(testContext.cradle, {
113+
autocreateTopics: true,
114+
})
114115

115-
// When
116-
if (!lazyInit) await publisher.init()
117-
await publisher.publish('permission-added', {
118-
id: '1',
119-
type: 'added',
120-
permissions: [],
121-
})
116+
// When
117+
if (!lazyInit) await publisher.init()
118+
await publisher.publish('permission-added', {
119+
id: '1',
120+
type: 'added',
121+
permissions: [],
122+
})
122123

123-
// Then
124-
const emittedEvent = await publisher.handlerSpy.waitForMessageWithId('1', 'published')
125-
expect(emittedEvent.message).toMatchObject({ id: '1', type: 'added' })
126-
},
127-
)
124+
// Then
125+
const emittedEvent = await publisher.handlerSpy.waitForMessageWithId('1', 'published')
126+
expect(emittedEvent.message).toMatchObject({ id: '1', type: 'added' })
127+
})
128128
})
129129

130130
describe('publish', () => {

packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -797,64 +797,64 @@ describe('SnsSqsPermissionConsumer', () => {
797797
await diContainer.dispose()
798798
})
799799

800-
it.each([false, true])(
801-
'using 2 consumers with heartbeat -> %s',
802-
async (heartbeatEnabled) => {
803-
let consumer1IsProcessing = false
804-
let consumer1Counter = 0
805-
let consumer2Counter = 0
806-
807-
const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, {
808-
creationConfig: {
809-
topic: { Name: topicName },
810-
queue: {
811-
QueueName: queueName,
812-
Attributes: { VisibilityTimeout: '2' },
813-
},
814-
},
815-
consumerOverrides: {
816-
heartbeatInterval: heartbeatEnabled ? 1 : undefined,
817-
},
818-
removeHandlerOverride: async () => {
819-
consumer1IsProcessing = true
820-
await setTimeout(3100) // Wait to the visibility timeout to expire
821-
consumer1Counter++
822-
consumer1IsProcessing = false
823-
return { result: 'success' }
824-
},
825-
})
826-
await consumer1.start()
800+
it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => {
801+
let consumer1IsProcessing = false
802+
let consumer1Counter = 0
803+
let consumer2Counter = 0
827804

828-
const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, {
829-
locatorConfig: {
830-
queueUrl: consumer1.subscriptionProps.queueUrl,
831-
topicArn: consumer1.subscriptionProps.topicArn,
832-
subscriptionArn: consumer1.subscriptionProps.subscriptionArn,
833-
},
834-
removeHandlerOverride: () => {
835-
consumer2Counter++
836-
return Promise.resolve({ result: 'success' })
805+
const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, {
806+
creationConfig: {
807+
topic: { Name: topicName },
808+
queue: {
809+
QueueName: queueName,
810+
Attributes: { VisibilityTimeout: '2' },
837811
},
838-
})
839-
const publisher = new SnsPermissionPublisher(diContainer.cradle, {
840-
locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn },
841-
})
812+
},
813+
consumerOverrides: {
814+
heartbeatInterval: heartbeatEnabled ? 1 : undefined,
815+
},
816+
removeHandlerOverride: async () => {
817+
consumer1IsProcessing = true
818+
await setTimeout(3100) // Wait to the visibility timeout to expire
819+
consumer1Counter++
820+
consumer1IsProcessing = false
821+
return { result: 'success' }
822+
},
823+
})
824+
await consumer1.start()
842825

843-
await publisher.publish({ id: '10', messageType: 'remove' })
844-
// wait for consumer1 to start processing to start second consumer
845-
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
846-
await consumer2.start()
826+
const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, {
827+
locatorConfig: {
828+
queueUrl: consumer1.subscriptionProps.queueUrl,
829+
topicArn: consumer1.subscriptionProps.topicArn,
830+
subscriptionArn: consumer1.subscriptionProps.subscriptionArn,
831+
},
832+
removeHandlerOverride: () => {
833+
consumer2Counter++
834+
return Promise.resolve({ result: 'success' })
835+
},
836+
})
837+
const publisher = new SnsPermissionPublisher(diContainer.cradle, {
838+
locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn },
839+
})
840+
841+
await publisher.publish({ id: '10', messageType: 'remove' })
842+
// wait for consumer1 to start processing to start second consumer
843+
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
844+
await consumer2.start()
847845

848-
// wait for both consumers to process message
849-
await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40)
846+
// wait for consumer1 to process, and consumer2 only when heartbeat is disabled
847+
await waitAndRetry(
848+
() => consumer1Counter > 0 && (heartbeatEnabled || consumer2Counter > 0),
849+
100,
850+
40,
851+
)
850852

851-
expect(consumer1Counter).toBe(1)
852-
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
853+
expect(consumer1Counter).toBe(1)
854+
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
853855

854-
await Promise.all([consumer1.close(), consumer2.close()])
855-
},
856-
10000,
857-
)
856+
await Promise.all([consumer1.close(), consumer2.close()])
857+
}, 10000)
858858
})
859859

860860
describe('exponential backoff retry', () => {

packages/sns/test/utils/testSnsConfig.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import type { S3ClientConfig } from '@aws-sdk/client-s3'
22
import type { SNSClientConfig } from '@aws-sdk/client-sns'
3+
import type { SQSClientConfig } from '@aws-sdk/client-sqs'
34

4-
export const TEST_AWS_CONFIG: SNSClientConfig & S3ClientConfig = {
5+
export const TEST_AWS_CONFIG: SNSClientConfig & SQSClientConfig & S3ClientConfig = {
56
endpoint: 'http://s3.localhost.localstack.cloud:4566',
67
region: 'eu-west-1',
78
credentials: {

0 commit comments

Comments
 (0)