Skip to content

Commit dd194cf

Browse files
committed
done with kotlin tutorials
1 parent 0a8596e commit dd194cf

File tree

7 files changed

+647
-0
lines changed

7 files changed

+647
-0
lines changed

kotlin/README.md

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,116 @@
11
# RabbitMQ Tutorials in Kotlin (JVM & Native)
22

3+
This repository contains RabbitMQ tutorials implemented using [Kourier](https://kourier.dev), a pure Kotlin multiplatform AMQP client.
34

5+
## About
6+
7+
These tutorials demonstrate the core concepts of RabbitMQ using Kotlin and the Kourier AMQP client library. All examples work on JVM, macOS (ARM64), Linux (x64), and Windows (x64) platforms through Kotlin Multiplatform.
8+
9+
## Prerequisites
10+
11+
- RabbitMQ server running on localhost (default port 5672)
12+
- Kotlin 2.1.21
13+
- Gradle
14+
15+
## Building
16+
17+
```bash
18+
./gradlew build
19+
```
20+
21+
## Tutorials
22+
23+
### 1. Hello World (HelloWorld.kt)
24+
25+
The simplest thing that does something - sending and receiving messages from a named queue.
26+
27+
Functions:
28+
- `send()` - Sends "Hello World!" message to the queue
29+
- `receive()` - Receives and prints messages from the queue
30+
31+
### 2. Work Queues (WorkQueues.kt)
32+
33+
Distributing time-consuming tasks among multiple workers.
34+
35+
Functions:
36+
- `newTask(message)` - Sends a task to the work queue (dots represent work time)
37+
- `worker(workerName)` - Processes tasks with fair dispatch and manual acknowledgment
38+
39+
Key concepts:
40+
- Message durability
41+
- Fair dispatch with `basicQos`
42+
- Manual acknowledgments
43+
44+
### 3. Publish/Subscribe (PublishSubscribe.kt)
45+
46+
Sending messages to many consumers at once using fanout exchanges.
47+
48+
Functions:
49+
- `emitLog(message)` - Publishes log messages to all subscribers
50+
- `receiveLogs(subscriberName)` - Subscribes to all log messages
51+
52+
Key concepts:
53+
- Fanout exchanges
54+
- Temporary queues
55+
- Broadcasting messages
56+
57+
### 4. Routing (Routing.kt)
58+
59+
Receiving messages selectively using direct exchanges and routing keys.
60+
61+
Functions:
62+
- `emitLogDirect(severity, message)` - Publishes log with specific severity
63+
- `receiveLogsDirect(subscriberName, severities)` - Subscribes to specific severities
64+
65+
Key concepts:
66+
- Direct exchanges
67+
- Routing keys
68+
- Multiple bindings per queue
69+
70+
### 5. Topics (Topics.kt)
71+
72+
Receiving messages based on patterns using topic exchanges.
73+
74+
Functions:
75+
- `emitLogTopic(routingKey, message)` - Publishes with topic routing key
76+
- `receiveLogsTopic(subscriberName, bindingKeys)` - Subscribes using patterns
77+
78+
Key concepts:
79+
- Topic exchanges
80+
- Wildcard patterns (`*` = one word, `#` = zero or more words)
81+
- Pattern-based routing
82+
83+
### 6. RPC (RPC.kt)
84+
85+
Request/reply pattern for remote procedure calls.
86+
87+
Functions:
88+
- `rpcServer()` - Processes Fibonacci number requests
89+
- `rpcClient(n)` - Sends RPC request and waits for response
90+
91+
Key concepts:
92+
- Callback queues
93+
- Correlation IDs
94+
- Reply-to pattern
95+
96+
## Running Examples
97+
98+
These tutorials are designed as library functions. You can call them from your own code or tests. For example:
99+
100+
```kotlin
101+
import kotlinx.coroutines.runBlocking
102+
103+
fun main() = runBlocking {
104+
// Example: Run work queue tutorial
105+
launch { worker(this, "Worker-1") }
106+
launch { worker(this, "Worker-2") }
107+
108+
delay(1000) // Give workers time to start
109+
110+
newTask(this, "Task with work...")
111+
}
112+
```
113+
114+
## More Information
115+
116+
For detailed explanations of each tutorial, see the [Kourier documentation](https://kourier.dev/tutorials/).

kotlin/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ kotlin {
1515

1616
applyDefaultHierarchyTemplate()
1717
sourceSets {
18+
all {
19+
languageSettings.apply {
20+
optIn("kotlin.uuid.ExperimentalUuidApi")
21+
}
22+
}
1823
val commonMain by getting {
1924
dependencies {
2025
api("dev.kourier:amqp-client:0.3.1")
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import dev.kourier.amqp.BuiltinExchangeType
2+
import dev.kourier.amqp.Properties
3+
import dev.kourier.amqp.connection.amqpConfig
4+
import dev.kourier.amqp.connection.createAMQPConnection
5+
import io.ktor.utils.io.core.*
6+
import kotlinx.coroutines.*
7+
8+
suspend fun emitLog(coroutineScope: CoroutineScope, message: String) {
9+
val config = amqpConfig {
10+
server {
11+
host = "localhost"
12+
}
13+
}
14+
val connection = createAMQPConnection(coroutineScope, config)
15+
val channel = connection.openChannel()
16+
17+
// Declare a fanout exchange - broadcasts to all bound queues
18+
channel.exchangeDeclare(
19+
"logs",
20+
BuiltinExchangeType.FANOUT,
21+
durable = false,
22+
autoDelete = false,
23+
internal = false,
24+
arguments = emptyMap()
25+
)
26+
27+
// Publish to the exchange (routing key is ignored for fanout)
28+
channel.basicPublish(
29+
message.toByteArray(),
30+
exchange = "logs",
31+
routingKey = "", // Routing key is ignored by fanout exchanges
32+
properties = Properties()
33+
)
34+
println(" [x] Sent '$message'")
35+
36+
channel.close()
37+
connection.close()
38+
}
39+
40+
suspend fun receiveLogs(coroutineScope: CoroutineScope, subscriberName: String) {
41+
val config = amqpConfig {
42+
server {
43+
host = "localhost"
44+
}
45+
}
46+
val connection = createAMQPConnection(coroutineScope, config)
47+
val channel = connection.openChannel()
48+
49+
// Declare the same fanout exchange
50+
channel.exchangeDeclare(
51+
"logs",
52+
BuiltinExchangeType.FANOUT,
53+
durable = false,
54+
autoDelete = false,
55+
internal = false,
56+
arguments = emptyMap()
57+
)
58+
59+
// Declare a temporary, exclusive, auto-delete queue
60+
// The server generates a unique name for us
61+
val queueDeclared = channel.queueDeclare(
62+
name = "", // Empty name = server generates a unique name
63+
durable = false,
64+
exclusive = true, // Queue is deleted when connection closes
65+
autoDelete = true, // Queue is deleted when no consumers
66+
arguments = emptyMap()
67+
)
68+
val queueName = queueDeclared.queueName
69+
println(" [$subscriberName] Created temporary queue: $queueName")
70+
71+
// Bind the queue to the exchange
72+
channel.queueBind(
73+
queue = queueName,
74+
exchange = "logs",
75+
routingKey = "" // Routing key is ignored for fanout
76+
)
77+
println(" [$subscriberName] Waiting for logs. To exit press CTRL+C")
78+
79+
// Consume messages with auto-ack (since these are just logs)
80+
val consumer = channel.basicConsume(queueName, noAck = true)
81+
82+
for (delivery in consumer) {
83+
val message = delivery.message.body.decodeToString()
84+
println(" [$subscriberName] $message")
85+
}
86+
87+
channel.close()
88+
connection.close()
89+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import dev.kourier.amqp.connection.amqpConfig
2+
import dev.kourier.amqp.connection.createAMQPConnection
3+
import dev.kourier.amqp.properties
4+
import io.ktor.utils.io.core.*
5+
import kotlinx.coroutines.*
6+
import kotlin.uuid.Uuid
7+
8+
/**
9+
* Fibonacci function - calculates Fibonacci number recursively.
10+
* Note: This is a simple recursive implementation for demonstration.
11+
* Not suitable for large numbers in production.
12+
*/
13+
private fun fib(n: Int): Int {
14+
return when {
15+
n == 0 -> 0
16+
n == 1 -> 1
17+
else -> fib(n - 1) + fib(n - 2)
18+
}
19+
}
20+
21+
/**
22+
* RPC Server - Processes Fibonacci requests and sends responses.
23+
*/
24+
suspend fun rpcServer(coroutineScope: CoroutineScope) {
25+
val config = amqpConfig {
26+
server {
27+
host = "localhost"
28+
}
29+
}
30+
val connection = createAMQPConnection(coroutineScope, config)
31+
val channel = connection.openChannel()
32+
33+
try {
34+
// Declare the RPC queue
35+
channel.queueDeclare(
36+
"rpc_queue",
37+
durable = false,
38+
exclusive = false,
39+
autoDelete = false,
40+
arguments = emptyMap()
41+
)
42+
43+
// Fair dispatch - don't give more than one message at a time
44+
channel.basicQos(count = 1u, global = false)
45+
46+
println(" [x] Awaiting RPC requests")
47+
48+
// Consume RPC requests
49+
val consumer = channel.basicConsume("rpc_queue", noAck = false)
50+
51+
for (delivery in consumer) {
52+
try {
53+
val props = delivery.message.properties
54+
val correlationId = props.correlationId
55+
val replyTo = props.replyTo
56+
57+
// Parse request (expecting an integer)
58+
val requestMessage = delivery.message.body.decodeToString()
59+
val n = requestMessage.toIntOrNull() ?: 0
60+
61+
println(" [.] fib($n)")
62+
63+
// Calculate Fibonacci
64+
val response = fib(n)
65+
66+
// Build response properties with correlation ID
67+
val replyProps = properties {
68+
this.correlationId = correlationId
69+
}
70+
71+
// Send response to the callback queue
72+
if (replyTo != null) {
73+
channel.basicPublish(
74+
response.toString().toByteArray(),
75+
exchange = "",
76+
routingKey = replyTo,
77+
properties = replyProps
78+
)
79+
}
80+
81+
// Acknowledge the request
82+
channel.basicAck(delivery.message, multiple = false)
83+
} catch (e: Exception) {
84+
println(" [.] Error processing request: ${e.message}")
85+
e.printStackTrace()
86+
channel.basicAck(delivery.message, multiple = false)
87+
}
88+
}
89+
} finally {
90+
// Ensure cleanup happens even when coroutine is cancelled
91+
channel.close()
92+
connection.close()
93+
}
94+
}
95+
96+
/**
97+
* RPC Client - Sends Fibonacci requests and waits for responses.
98+
*/
99+
suspend fun rpcClient(coroutineScope: CoroutineScope, n: Int): Int {
100+
val config = amqpConfig {
101+
server {
102+
host = "localhost"
103+
}
104+
}
105+
val connection = createAMQPConnection(coroutineScope, config)
106+
val channel = connection.openChannel()
107+
108+
try {
109+
// Create an exclusive callback queue for receiving responses
110+
val callbackQueueDeclared = channel.queueDeclare(
111+
name = "",
112+
durable = false,
113+
exclusive = true, // Exclusive to this connection
114+
autoDelete = true,
115+
arguments = emptyMap()
116+
)
117+
val callbackQueueName = callbackQueueDeclared.queueName
118+
119+
// Generate a unique correlation ID for this request
120+
val correlationId = Uuid.random().toString()
121+
122+
// Start consuming BEFORE sending the request to avoid race condition
123+
val consumer = channel.basicConsume(callbackQueueName, noAck = true)
124+
var result = 0
125+
126+
// Build request properties
127+
val requestProps = properties {
128+
this.correlationId = correlationId
129+
this.replyTo = callbackQueueName
130+
}
131+
132+
// Send the RPC request
133+
channel.basicPublish(
134+
n.toString().toByteArray(),
135+
exchange = "",
136+
routingKey = "rpc_queue",
137+
properties = requestProps
138+
)
139+
println(" [x] Requesting fib($n)")
140+
141+
withTimeout(10000) { // 10 second timeout
142+
for (delivery in consumer) {
143+
val responseCorrelationId = delivery.message.properties.correlationId
144+
145+
if (responseCorrelationId == correlationId) {
146+
// Found matching response
147+
val responseMessage = delivery.message.body.decodeToString()
148+
result = responseMessage.toInt()
149+
println(" [.] Got $result")
150+
break
151+
}
152+
}
153+
}
154+
155+
return result
156+
} finally {
157+
// Ensure cleanup happens even on timeout or error
158+
channel.close()
159+
connection.close()
160+
}
161+
}

0 commit comments

Comments
 (0)