From 1daeb8c3704df0eca170662ccb04242c5776fa6d Mon Sep 17 00:00:00 2001 From: NathanFallet Date: Sat, 29 Nov 2025 13:42:20 +0100 Subject: [PATCH 1/2] adding kotlin tutorials --- tutorials/index.md | 7 + tutorials/tutorial-five-kotlin.md | 292 ++++++++++++++++++ tutorials/tutorial-four-kotlin.md | 309 +++++++++++++++++++ tutorials/tutorial-one-kotlin.md | 331 +++++++++++++++++++++ tutorials/tutorial-seven-kotlin.md | 379 ++++++++++++++++++++++++ tutorials/tutorial-six-kotlin.md | 379 ++++++++++++++++++++++++ tutorials/tutorial-three-kotlin.md | 366 +++++++++++++++++++++++ tutorials/tutorial-two-kotlin.md | 456 +++++++++++++++++++++++++++++ 8 files changed, 2519 insertions(+) create mode 100644 tutorials/tutorial-five-kotlin.md create mode 100644 tutorials/tutorial-four-kotlin.md create mode 100644 tutorials/tutorial-one-kotlin.md create mode 100644 tutorials/tutorial-seven-kotlin.md create mode 100644 tutorials/tutorial-six-kotlin.md create mode 100644 tutorials/tutorial-three-kotlin.md create mode 100644 tutorials/tutorial-two-kotlin.md diff --git a/tutorials/index.md b/tutorials/index.md index caceec07ea..e3934b44cb 100644 --- a/tutorials/index.md +++ b/tutorials/index.md @@ -72,6 +72,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. * [Python](tutorials/tutorial-one-python) * [Java](tutorials/tutorial-one-java) + * [Kotlin](tutorials/tutorial-one-kotlin) * [Ruby](tutorials/tutorial-one-ruby) * [PHP](tutorials/tutorial-one-php) * [C#](tutorials/tutorial-one-dotnet) @@ -92,6 +93,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. * [Python](tutorials/tutorial-two-python) * [Java](tutorials/tutorial-two-java) + * [Kotlin](tutorials/tutorial-two-kotlin) * [Ruby](tutorials/tutorial-two-ruby) * [PHP](tutorials/tutorial-two-php) * [C#](tutorials/tutorial-two-dotnet) @@ -112,6 +114,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. * [Python](tutorials/tutorial-three-python) * [Java](tutorials/tutorial-three-java) + * [Kotlin](tutorials/tutorial-three-kotlin) * [Ruby](tutorials/tutorial-three-ruby) * [PHP](tutorials/tutorial-three-php) * [C#](tutorials/tutorial-three-dotnet) @@ -134,6 +137,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. * [Python](tutorials/tutorial-four-python) * [Java](tutorials/tutorial-four-java) + * [Kotlin](tutorials/tutorial-four-kotlin) * [Ruby](tutorials/tutorial-four-ruby) * [PHP](tutorials/tutorial-four-php) * [C#](tutorials/tutorial-four-dotnet) @@ -154,6 +158,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. * [Python](tutorials/tutorial-five-python) * [Java](tutorials/tutorial-five-java) + * [Kotlin](tutorials/tutorial-five-kotlin) * [Ruby](tutorials/tutorial-five-ruby) * [PHP](tutorials/tutorial-five-php) * [C#](tutorials/tutorial-five-dotnet) @@ -174,6 +179,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. * [Python](tutorials/tutorial-six-python) * [Java](tutorials/tutorial-six-java) + * [Kotlin](tutorials/tutorial-six-kotlin) * [Ruby](tutorials/tutorial-six-ruby) * [PHP](tutorials/tutorial-six-php) * [C#](tutorials/tutorial-six-dotnet) @@ -192,6 +198,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. * [Java](tutorials/tutorial-seven-java) * [C#](tutorials/tutorial-seven-dotnet) + * [Kotlin](tutorials/tutorial-seven-kotlin) * [PHP](tutorials/tutorial-seven-php) diff --git a/tutorials/tutorial-five-kotlin.md b/tutorials/tutorial-five-kotlin.md new file mode 100644 index 0000000000..829087c03c --- /dev/null +++ b/tutorials/tutorial-five-kotlin.md @@ -0,0 +1,292 @@ +--- +title: RabbitMQ tutorial - Topics +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T5DiagramToC from '@site/src/components/Tutorials/T5DiagramToC.md'; +import T5DiagramTopicX from '@site/src/components/Tutorials/T5DiagramTopicX.md'; + +# RabbitMQ tutorial - Topics + +## Topics +### (using the Kotlin Client) + + + + + +In the [previous tutorial](./tutorial-four-kotlin) we improved our +logging system. Instead of using a `fanout` exchange only capable of +dummy broadcasting, we used a `direct` one, and gained a possibility +of selectively receiving the logs. + +Although using the `direct` exchange improved our system, it still has +limitations - it can't do routing based on multiple criteria. + +In our logging system we might want to subscribe to not only logs +based on severity, but also based on the source which emitted the log. +You might know this concept from the +[`syslog`](http://en.wikipedia.org/wiki/Syslog) unix tool, which +routes logs based on both severity (info/warn/crit...) and facility +(auth/cron/kern...). + +That would give us a lot of flexibility - we may want to listen to just +critical errors coming from 'cron' but also all logs from 'kern'. + +To implement that in our logging system we need to learn about a more +complex `topic` exchange. + + +Topic exchange +-------------- + +Messages sent to a `topic` exchange can't have an arbitrary +`routing_key` - it must be a list of words, delimited by dots. The +words can be anything, but usually they specify some features +connected to the message. A few valid routing key examples: +"`stock.usd.nyse`", "`nyse.vmw`", "`quick.orange.rabbit`". There can be as +many words in the routing key as you like, up to the limit of 255 +bytes. + +The binding key must also be in the same form. The logic behind the +`topic` exchange is similar to a `direct` one - a message sent with a +particular routing key will be delivered to all the queues that are +bound with a matching binding key. However there are two important +special cases for binding keys: + + * `*` (star) can substitute for exactly one word. + * `#` (hash) can substitute for zero or more words. + +It's easiest to explain this in an example: + + + +In this example, we're going to send messages which all describe +animals. The messages will be sent with a routing key that consists of +three words (two dots). The first word in the routing key will +describe speed, second a colour and third a species: +"`..`". + +We created three bindings: Q1 is bound with binding key "`*.orange.*`" +and Q2 with "`*.*.rabbit`" and "`lazy.#`". + +These bindings can be summarised as: + + * Q1 is interested in all the orange animals. + * Q2 wants to hear everything about rabbits, and everything about lazy + animals. + +A message with a routing key set to "`quick.orange.rabbit`" +will be delivered to both queues. Message "`lazy.orange.elephant`" +also will go to both of them. On the other hand +"`quick.orange.fox`" will only go to the first queue, and +"`lazy.brown.fox`" only to the second. "`lazy.pink.rabbit`" will +be delivered to the second queue only once, even though it matches two bindings. +"`quick.brown.fox`" doesn't match any binding so it will be discarded. + +What happens if we break our contract and send a message with one or +four words, like "`orange`" or "`quick.orange.male.rabbit`"? Well, +these messages won't match any bindings and will be lost. + +On the other hand "`lazy.orange.male.rabbit`", even though it has four +words, will match the last binding and will be delivered to the second +queue. + +> #### Topic exchange +> +> Topic exchange is powerful and can behave like other exchanges. +> +> When a queue is bound with "`#`" (hash) binding key - it will receive +> all the messages, regardless of the routing key - like in `fanout` exchange. +> +> When special characters "`*`" (star) and "`#`" (hash) aren't used in bindings, +> the topic exchange will behave just like a `direct` one. + +Putting it all together +----------------------- + +We're going to use a `topic` exchange in our logging system. We'll +start off with a working assumption that the routing keys of logs will +have two words: "`.`". + +The code is almost the same as in the +[previous tutorial](./tutorial-four-kotlin). + +The code for `emitLogTopic`: + +```kotlin +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope + +suspend fun emitLogTopic(coroutineScope: CoroutineScope, routingKey: String, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.exchangeDeclare( + "topic_logs", + BuiltinExchangeType.TOPIC, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + channel.basicPublish( + message.toByteArray(), + exchange = "topic_logs", + routingKey = routingKey, + properties = Properties() + ) + println(" [x] Sent '$routingKey':'$message'") + + channel.close() + connection.close() +} +``` + +The code for `receiveLogsTopic`: + +```kotlin +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope + +suspend fun receiveLogsTopic( + coroutineScope: CoroutineScope, + bindingKeys: List +) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.exchangeDeclare( + "topic_logs", + BuiltinExchangeType.TOPIC, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + val queueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() + ) + val queueName = queueDeclared.queueName + + if (bindingKeys.isEmpty()) { + System.err.println("Usage: receiveLogsTopic [binding_key]...") + return + } + + for (bindingKey in bindingKeys) { + channel.queueBind( + queue = queueName, + exchange = "topic_logs", + routingKey = bindingKey + ) + } + + println(" [*] Waiting for logs. To exit press CTRL+C") + + val consumer = channel.basicConsume(queueName, noAck = true) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + val routingKey = delivery.message.routingKey + println(" [x] Received '$routingKey':'$message'") + } + + channel.close() + connection.close() +} +``` + +To receive all the logs: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + receiveLogsTopic(this, listOf("#")) +} +``` + +To receive all logs from the facility "`kern`": + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + receiveLogsTopic(this, listOf("kern.*")) +} +``` + +Or if you want to hear only about "`critical`" logs: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + receiveLogsTopic(this, listOf("*.critical")) +} +``` + +You can create multiple bindings: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + receiveLogsTopic(this, listOf("kern.*", "*.critical")) +} +``` + +And to emit a log with a routing key "`kern.critical`" type: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + emitLogTopic(this, "kern.critical", "A critical kernel error") +} +``` + +Have fun playing with these programs. Note that the code doesn't make +any assumption about the routing or binding keys, you may want to play +with more than two routing key parameters. + +Move on to [tutorial 6](./tutorial-six-kotlin) to find out how to do a +round trip message as a remote procedure call. diff --git a/tutorials/tutorial-four-kotlin.md b/tutorials/tutorial-four-kotlin.md new file mode 100644 index 0000000000..7ed9d40801 --- /dev/null +++ b/tutorials/tutorial-four-kotlin.md @@ -0,0 +1,309 @@ +--- +title: RabbitMQ tutorial - Routing +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T4DiagramDirectX from '@site/src/components/Tutorials/T4DiagramDirectX.md'; +import T4DiagramMultipleBindings from '@site/src/components/Tutorials/T4DiagramMultipleBindings.md'; +import T4DiagramFull from '@site/src/components/Tutorials/T4DiagramFull.md'; + +# RabbitMQ tutorial - Routing + +## Routing +### (using the Kotlin Client) + + + +In the [previous tutorial](./tutorial-three-kotlin) we built a +simple logging system. We were able to broadcast log messages to many +receivers. + +In this tutorial we're going to add a feature to it - we're going to +make it possible to subscribe only to a subset of the messages. For +example, we will be able to direct only critical error messages to the +log file (to save disk space), while still being able to print all of +the log messages on the console. + + +Bindings +-------- + +In previous examples we were already creating bindings. You may recall +code like: + +```kotlin +channel.queueBind(queueName, "logs", routingKey = "") +``` + +A binding is a relationship between an exchange and a queue. This can +be simply read as: the queue is interested in messages from this +exchange. + +Bindings can take an extra `routingKey` parameter. To avoid the +confusion with a `basicPublish` parameter we're going to call it a +`binding key`. This is how we could create a binding with a key: + +```kotlin +channel.queueBind(queueName, "direct_logs", routingKey = "black") +``` + +The meaning of a binding key depends on the exchange type. The +`fanout` exchanges, which we used previously, simply ignored its +value. + +Direct exchange +--------------- + +Our logging system from the previous tutorial broadcasts all messages +to all consumers. We want to extend that to allow filtering messages +based on their severity. For example we may want a program which +writes log messages to the disk to only receive critical errors, and +not waste disk space on warning or info log messages. + +We were using a `fanout` exchange, which doesn't give us much +flexibility - it's only capable of mindless broadcasting. + +We will use a `direct` exchange instead. The routing algorithm behind +a `direct` exchange is simple - a message goes to the queues whose +`binding key` exactly matches the `routing key` of the message. + +To illustrate that, consider the following setup: + + + +In this setup, we can see the `direct` exchange `X` with two queues bound +to it. The first queue is bound with binding key `orange`, and the second +has two bindings, one with binding key `black` and the other one +with `green`. + +In such a setup a message published to the exchange with a routing key +`orange` will be routed to queue `Q1`. Messages with a routing key of `black` +or `green` will go to `Q2`. All other messages will be discarded. + + +Multiple bindings +----------------- + + +It is perfectly legal to bind multiple queues with the same binding +key. In our example we could add a binding between `X` and `Q1` with +binding key `black`. In that case, the `direct` exchange will behave +like `fanout` and will broadcast the message to all the matching +queues. A message with routing key `black` will be delivered to both +`Q1` and `Q2`. + + +Emitting logs +------------- + +We'll use this model for our logging system. Instead of `fanout` we'll +send messages to a `direct` exchange. We will supply the log severity as +a `routing key`. That way the receiving program will be able to select +the severity it wants to receive. Let's focus on emitting logs +first. + +As always, we need to create an exchange first: + +```kotlin +channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT) +``` + +And we're ready to send a message: + +```kotlin +channel.basicPublish( + message.toByteArray(), + exchange = "direct_logs", + routingKey = severity, + properties = Properties() +) +``` + +To simplify things we will assume that 'severity' can be one of +`info`, `warning`, or `error`. + + +Subscribing +----------- + +Receiving messages will work just like in the previous tutorial, with +one exception - we're going to create a new binding for each severity +we're interested in. + +```kotlin +val queueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() +) +val queueName = queueDeclared.queueName + +for (severity in severities) { + channel.queueBind(queueName, "direct_logs", routingKey = severity) +} +``` + + +Putting it all together +----------------------- + + + + +The code for `emitLogDirect` function: + +```kotlin +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope + +suspend fun emitLogDirect(coroutineScope: CoroutineScope, severity: String, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.exchangeDeclare( + "direct_logs", + BuiltinExchangeType.DIRECT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + channel.basicPublish( + message.toByteArray(), + exchange = "direct_logs", + routingKey = severity, + properties = Properties() + ) + println(" [x] Sent '$severity':'$message'") + + channel.close() + connection.close() +} +``` + +The code for `receiveLogsDirect`: + +```kotlin +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope + +suspend fun receiveLogsDirect( + coroutineScope: CoroutineScope, + severities: List +) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.exchangeDeclare( + "direct_logs", + BuiltinExchangeType.DIRECT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + val queueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() + ) + val queueName = queueDeclared.queueName + + if (severities.isEmpty()) { + System.err.println("Usage: receiveLogsDirect [info] [warning] [error]") + return + } + + for (severity in severities) { + channel.queueBind( + queue = queueName, + exchange = "direct_logs", + routingKey = severity + ) + } + println(" [*] Waiting for messages. To exit press CTRL+C") + + val consumer = channel.basicConsume(queueName, noAck = true) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + val routingKey = delivery.message.routingKey + println(" [x] Received '$routingKey':'$message'") + } + + channel.close() + connection.close() +} +``` + +If you want to save only 'warning' and 'error' (and not 'info') log +messages to a file, just open a console and type: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + receiveLogsDirect(this, listOf("warning", "error")) +} +``` + +If you'd like to see all the log messages on your screen, open a new +terminal and do: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + receiveLogsDirect(this, listOf("info", "warning", "error")) +} +``` + +And, for example, to emit an `error` log message just type: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + emitLogDirect(this, "error", "Run. Run. Or it will explode.") +} +``` + +Move on to [tutorial 5](./tutorial-five-kotlin) to find out how to listen +for messages based on a pattern. diff --git a/tutorials/tutorial-one-kotlin.md b/tutorials/tutorial-one-kotlin.md new file mode 100644 index 0000000000..3f92dfc7a9 --- /dev/null +++ b/tutorials/tutorial-one-kotlin.md @@ -0,0 +1,331 @@ +--- +title: RabbitMQ tutorial - "Hello World!" +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import TutorialsIntro from '@site/src/components/Tutorials/TutorialsIntro.md'; +import T1DiagramHello from '@site/src/components/Tutorials/T1DiagramHello.md'; +import T1DiagramSending from '@site/src/components/Tutorials/T1DiagramSending.md'; +import T1DiagramReceiving from '@site/src/components/Tutorials/T1DiagramReceiving.md'; + +# RabbitMQ tutorial - "Hello World!" + +## Introduction + + + + +## "Hello World" +### (using the Kotlin Client) + +In this part of the tutorial we'll write two programs in Kotlin; a +producer that sends a single message, and a consumer that receives +messages and prints them out. We'll gloss over some of the detail in +the Kotlin API, concentrating on this very simple thing just to get +started. It's the "Hello World" of messaging. + +In the diagram below, "P" is our producer and "C" is our consumer. The +box in the middle is a queue - a message buffer that RabbitMQ keeps +on behalf of the consumer. + + + +> #### The Kotlin client library +> +> RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, +> general-purpose protocol for messaging. There are a number of clients +> for RabbitMQ in [many different +> languages](/client-libraries/devtools). We'll +> use the [Kourier](https://github.com/NathanFallet/Kourier) client library for Kotlin. +> +> Kourier is a modern, coroutine-based AMQP 0-9-1 client for Kotlin. To use it in your project, +> add the following dependency: +> +> **Gradle (Kotlin DSL):** +> ```kotlin +> dependencies { +> implementation("dev.kourier:kourier-amqp:1.0.0") +> } +> ``` +> +> **Gradle (Groovy DSL):** +> ```groovy +> dependencies { +> implementation 'dev.kourier:kourier-amqp:1.0.0' +> } +> ``` +> +> **Maven:** +> ```xml +> +> dev.kourier +> kourier-amqp +> 1.0.0 +> +> ``` + +Now we have the Kotlin client library set up, we can write some code. + +### Sending + + + +We'll call our message publisher (sender) `send` and our message consumer (receiver) +`receive`. The publisher will connect to RabbitMQ, send a single message, +then exit. + +We need some imports: + +```kotlin +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope +``` + +Set up the send function and the queue name: + +```kotlin +val queueName = "hello" + +suspend fun send(coroutineScope: CoroutineScope) { + // ... +} +``` + +Then we can create a connection to the server: + +```kotlin +suspend fun send(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + // Publishing code will go here... + + channel.close() + connection.close() +} +``` + +The connection abstracts the socket connection, and takes care of +protocol version negotiation and authentication and so on for us. Here +we connect to a RabbitMQ node on the local machine - hence the +_localhost_. If we wanted to connect to a node on a different +machine we'd simply specify its hostname or IP address here. + +Next we create a channel, which is where most of the API for getting +things done resides. + +To send, we must declare a queue for us to send to; then we can publish a message +to the queue: + +```kotlin +channel.queueDeclare( + queueName, + durable = false, + exclusive = false, + autoDelete = false, + arguments = emptyMap() +) +val message = "Hello World!" +channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = queueName, + properties = Properties() +) +println(" [x] Sent '$message'") +``` + +Declaring a queue is idempotent - it will only be created if it doesn't +exist already. The message content is a byte array, so you can encode +whatever you like there. + +> #### Sending doesn't work! +> +> If this is your first time using RabbitMQ and you don't see the "Sent" +> message then you may be left scratching your head wondering what could +> be wrong. Maybe the broker was started without enough free disk space +> (by default it needs at least 50 MB free) and is therefore refusing to +> accept messages. Check the broker [log file](/docs/logging/) to see if there +> is a [resource alarm](/docs/alarms) logged and reduce the +> free disk space threshold if necessary. +> The [Configuration guide](/docs/configure#config-items) +> will show you how to set disk_free_limit. + + +### Receiving + +That's it for our publisher. Our consumer listens for messages from +RabbitMQ, so unlike the publisher which publishes a single message, we'll +keep the consumer running to listen for messages and print them out. + + + +Setting up is the same as the publisher; we open a connection and a +channel, and declare the queue from which we're going to consume. +Note this matches up with the queue that `send` publishes to. + +```kotlin +suspend fun receive(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + queueName, + durable = false, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + println(" [*] Waiting for messages. To exit press CTRL+C") + + // Consuming code will go here... + + channel.close() + connection.close() +} +``` + +Note that we declare the queue here, as well. Because we might start +the consumer before the publisher, we want to make sure the queue exists +before we try to consume messages from it. + +We're about to tell the server to deliver us the messages from the +queue. Since Kourier is built on Kotlin coroutines, consuming messages +is as simple as iterating over a channel: + +```kotlin +val consumer = channel.basicConsume(queueName, noAck = true) + +for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [x] Received '$message'") +} +``` + +### Putting it all together + +You can wrap both functions in a `main` function with a `runBlocking` block: + +```kotlin +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +val queueName = "hello" + +suspend fun send(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + queueName, + durable = false, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + val message = "Hello World!" + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = queueName, + properties = Properties() + ) + println(" [x] Sent '$message'") + + channel.close() + connection.close() +} + +suspend fun receive(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + queueName, + durable = false, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + println(" [*] Waiting for messages. To exit press CTRL+C") + + val consumer = channel.basicConsume(queueName, noAck = true) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [x] Received '$message'") + } + + channel.close() + connection.close() +} + +fun main() = runBlocking { + launch { send(this) } + launch { receive(this) } + + delay(Long.MAX_VALUE) // Keep the main thread alive +} +``` + +The consumer will print the message it gets from the publisher via +RabbitMQ. The consumer will keep running, waiting for messages (Use Ctrl-C to stop it). + +> #### Listing queues +> +> You may wish to see what queues RabbitMQ has and how many +> messages are in them. You can do it (as a privileged user) using the `rabbitmqctl` tool: +> +> ```bash +> sudo rabbitmqctl list_queues +> ``` +> +> On Windows, omit the sudo: +> ```PowerShell +> rabbitmqctl.bat list_queues +> ``` + + +Time to move on to [part 2](./tutorial-two-kotlin) and build a simple _work queue_. diff --git a/tutorials/tutorial-seven-kotlin.md b/tutorials/tutorial-seven-kotlin.md new file mode 100644 index 0000000000..f705154b02 --- /dev/null +++ b/tutorials/tutorial-seven-kotlin.md @@ -0,0 +1,379 @@ +--- +title: RabbitMQ tutorial - Publisher Confirms +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; + +# RabbitMQ tutorial - Publisher Confirms + +## Publisher Confirms +### (using the Kotlin Client) + + + +[Publisher confirms](/docs/confirms#publisher-confirms) are a RabbitMQ extension +to implement reliable publishing. When publisher confirms are enabled on a +channel, messages the client publishes are confirmed asynchronously by the +broker, meaning they have been taken care of on the server side. + +### Overview + +In this tutorial we'll use publisher confirms to make sure published +messages have safely reached the broker. We will cover several +strategies for using publisher confirms and explain their pros and cons. + +### Enabling Publisher Confirms on a Channel + +Publisher confirms are a RabbitMQ extension to the AMQP 0.9.1 protocol. +Publisher confirms are enabled at the channel level. To enable them, use the +`confirmSelect` method: + +```kotlin +channel.confirmSelect() +``` + +This method must be called on every channel that you expect to use publisher +confirms. Confirms should be enabled just once, not for every message published. + +### Strategy #1: Publishing Messages Individually + +Let's start with the simplest approach to publishing with confirms, +that is, publishing a message and waiting synchronously for its confirmation: + +```kotlin +suspend fun publishMessagesIndividually(channel: AMQPChannel, messages: List) { + channel.confirmSelect() + + for (message in messages) { + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "my_queue", + properties = Properties() + ) + + // Wait for confirm + val confirm = channel.publishConfirmResponses.first() + + when (confirm) { + is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> { + println("✓ Message confirmed: $message") + } + is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> { + println("✗ Message rejected: $message") + // Handle rejection (retry, log, etc.) + } + } + } +} +``` + +In the previous example we publish a message as usual and wait for its +confirmation with the `first()` call. The method returns as +soon as the message has been confirmed. If the message is not confirmed +within the timeout or if it is nack-ed (meaning the broker could not take +care of it for some reason), the method will throw an exception. The handling +of the exception usually consists in logging an error message and/or retrying +to send the message. + +Different client libraries have different ways to synchronously deal with publisher +confirms, so make sure to read carefully the documentation of the client you are using. + +This technique is very straightforward but also has a major drawback: +it **significantly slows down publishing**, as the confirmation of a message blocks +the publishing of all subsequent messages. This approach is not going to +deliver throughput of more than a few hundreds of published messages per second. +Nevertheless, this can be good enough for some applications. + +> #### Are Publisher Confirms Asynchronous? +> +> We mentioned at the beginning that the broker confirms published +> messages asynchronously but in the first example the code waits +> synchronously until the message is confirmed. The client actually +> receives confirms asynchronously and unblocks the call to `first()` +> accordingly. Think of `first()` as a synchronous helper which +> relies on asynchronous notifications under the hood. + +### Strategy #2: Publishing Messages in Batches + +To improve upon our previous example, we can publish a batch of messages +and wait for this whole batch to be confirmed. The following example uses +a batch of 100: + +```kotlin +suspend fun publishMessagesInBatch(channel: AMQPChannel, messages: List, batchSize: Int) { + channel.confirmSelect() + + messages.chunked(batchSize).forEach { batch -> + // Publish entire batch + batch.forEach { message -> + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "my_queue", + properties = Properties() + ) + } + + // Wait for all confirms for this batch + val confirms = channel.publishConfirmResponses.take(batch.size).toList() + + val ackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Ack } + val nackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Nack } + + println("Batch complete: $ackCount acks, $nackCount nacks") + + if (nackCount > 0) { + // Handle failures (can't identify specific messages easily) + println("Warning: Some messages in batch were rejected") + } + } +} +``` + +Waiting for a batch of messages to be confirmed improves throughput drastically over +waiting for a confirm for individual message (up to 20-30 times with a remote RabbitMQ node). +One drawback is that we do not know exactly what went wrong in case of failure, +so we may have to keep a whole batch in memory to log something meaningful or +to re-publish the messages. And this solution is still synchronous, so it +blocks the publishing of messages. + +### Strategy #3: Handling Publisher Confirms Asynchronously + +The broker confirms published messages asynchronously, one just needs to +register a callback on the client to be notified of these confirms: + +```kotlin +suspend fun publishMessagesAsync(channel: AMQPChannel, messages: List) { + channel.confirmSelect() + + val outstandingConfirms = mutableMapOf() + var nextDeliveryTag = 1UL + + // Launch coroutine to handle confirms + val confirmJob = launch { + channel.publishConfirmResponses.collect { confirm -> + when (confirm) { + is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> { + if (confirm.multiple) { + // Remove all up to and including this tag + outstandingConfirms.keys.filter { it <= confirm.deliveryTag } + .forEach { outstandingConfirms.remove(it) } + } else { + outstandingConfirms.remove(confirm.deliveryTag) + } + } + is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> { + val message = outstandingConfirms[confirm.deliveryTag] + println("✗ Message nacked: $message") + // Handle specific message rejection + outstandingConfirms.remove(confirm.deliveryTag) + } + } + } + } + + // Publish all messages + messages.forEach { message -> + outstandingConfirms[nextDeliveryTag] = message + + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "my_queue", + properties = Properties() + ) + + nextDeliveryTag++ + } + + // Wait until all confirms are received + while (outstandingConfirms.isNotEmpty()) { + delay(10) + } + + confirmJob.cancel() +} +``` + +In this example we use Kotlin's Flow API to handle confirms asynchronously. We collect +confirms from the `publishConfirmResponses` flow. The callback will be invoked for each +confirmed message. We keep track of outstanding confirms with a map. When a confirm arrives, +we remove the entry from the map. If the confirm indicates that multiple messages have been +confirmed (the `multiple` field is `true`), we remove all messages up to and including the +confirmed delivery tag. + +The async approach for handling confirms requires tracking of published messages. We use +a concurrent map to correlate the publish delivery tag with the message content. This is +necessary for logging meaningful information or to re-publish a message that has been +nack-ed. The handling of confirms can also be decomposed into a fire-and-forget approach: +a background task or flow can handle the confirms and update the map accordingly. + +### Summary + +Making sure published messages made it to the broker can be essential in some applications. +Publisher confirms are a RabbitMQ feature that helps to meet this requirement. Publisher +confirms are asynchronous in nature but it is also possible to handle them synchronously. +There is no definitive way to implement publisher confirms, this usually comes down to +the constraints in the application and in the overall system. Typical techniques are: + +* publish messages individually, wait for the confirmation synchronously: simple, but very + limited throughput. +* publish messages in batch, wait for the confirmation synchronously for a batch: simple, + reasonable throughput, but hard to reason about when something goes wrong. +* asynchronous handling: best performance and use of resources, good control in case of + error, but can be involved to implement correctly. + +### Putting it all together + +The full example code: + +```kotlin +import dev.kourier.amqp.AMQPResponse +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList + +suspend fun publishMessagesIndividually(channel: AMQPChannel, messages: List) { + channel.confirmSelect() + + for (message in messages) { + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "my_queue", + properties = Properties() + ) + + val confirm = channel.publishConfirmResponses.first() + + when (confirm) { + is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> { + println("✓ Message confirmed: $message") + } + is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> { + println("✗ Message rejected: $message") + } + } + } +} + +suspend fun publishMessagesInBatch(channel: AMQPChannel, messages: List, batchSize: Int) { + channel.confirmSelect() + + messages.chunked(batchSize).forEach { batch -> + batch.forEach { message -> + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "my_queue", + properties = Properties() + ) + } + + val confirms = channel.publishConfirmResponses.take(batch.size).toList() + + val ackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Ack } + val nackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Nack } + + println("Batch complete: $ackCount acks, $nackCount nacks") + + if (nackCount > 0) { + println("Warning: Some messages in batch were rejected") + } + } +} + +suspend fun publishMessagesAsync(channel: AMQPChannel, messages: List) { + channel.confirmSelect() + + val outstandingConfirms = mutableMapOf() + var nextDeliveryTag = 1UL + + val confirmJob = launch { + channel.publishConfirmResponses.collect { confirm -> + when (confirm) { + is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> { + if (confirm.multiple) { + outstandingConfirms.keys.filter { it <= confirm.deliveryTag } + .forEach { outstandingConfirms.remove(it) } + } else { + outstandingConfirms.remove(confirm.deliveryTag) + } + } + is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> { + val message = outstandingConfirms[confirm.deliveryTag] + println("✗ Message nacked: $message") + outstandingConfirms.remove(confirm.deliveryTag) + } + } + } + } + + messages.forEach { message -> + outstandingConfirms[nextDeliveryTag] = message + + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "my_queue", + properties = Properties() + ) + + nextDeliveryTag++ + } + + while (outstandingConfirms.isNotEmpty()) { + delay(10) + } + + confirmJob.cancel() +} + +fun main() = runBlocking { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(this, config) + val channel = connection.openChannel() + + channel.queueDeclare("my_queue", false, false, true, emptyMap()) + + val messages = List(1000) { "Message $it" } + + val startTime = System.currentTimeMillis() + publishMessagesAsync(channel, messages) + val duration = System.currentTimeMillis() - startTime + + println("Published ${messages.size} messages in ${duration}ms") + + channel.close() + connection.close() +} +``` + +This tutorial is now complete. Note that publisher confirms is an advanced feature +and may not be necessary for all applications. For more information on publisher +confirms and other reliability features, see the [documentation on reliability](/docs/confirms). diff --git a/tutorials/tutorial-six-kotlin.md b/tutorials/tutorial-six-kotlin.md new file mode 100644 index 0000000000..4ade9ba957 --- /dev/null +++ b/tutorials/tutorial-six-kotlin.md @@ -0,0 +1,379 @@ +--- +title: RabbitMQ tutorial - Remote procedure call (RPC) +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T6DiagramFull from '@site/src/components/Tutorials/T6DiagramFull.md'; + +# RabbitMQ tutorial - Remote procedure call (RPC) + +## Remote procedure call (RPC) +### (using the Kotlin Client) + + + + +In the [second tutorial](./tutorial-two-kotlin) we learned how to +use _Work Queues_ to distribute time-consuming tasks among multiple +workers. + +But what if we need to run a function on a remote computer and wait for +the result? Well, that's a different story. This pattern is commonly +known as _Remote Procedure Call_ or _RPC_. + +In this tutorial we're going to use RabbitMQ to build an RPC system: a +client and a scalable RPC server. As we don't have any time-consuming +tasks that are worth distributing, we're going to create a dummy RPC +service that returns Fibonacci numbers. + +Client interface +---------------- + +To illustrate how an RPC service could be used we're going to +create a simple client function. It will send an RPC +request and wait until the answer is received: + +```kotlin +val result = rpcClient(this, 30) +println("fib(30) = $result") +``` + +> #### A note on RPC +> +> Although RPC is a pretty common pattern in computing, it's often criticised. +> The problems arise when a programmer is not aware whether a function call +> is local or if it's a slow RPC. Confusions like that result in an +> unpredictable system and adds unnecessary complexity to debugging. +> Instead of simplifying software, misused RPC can result in unmaintainable +> spaghetti code. +> +> Bearing that in mind, consider the following advice: +> +> * Make sure it's obvious which function call is local and which is remote. +> * Document your system. Make the dependencies between components clear. +> * Handle error cases. How should the client react when the RPC server is +> down for a long time? +> +> When in doubt avoid RPC. If you can, you should use an asynchronous +> pipeline - instead of RPC-like blocking, results are asynchronously +> pushed to a next computation stage. + + +Callback queue +-------------- + +In general doing RPC over RabbitMQ is easy. A client sends a request +message and a server replies with a response message. In order to +receive a response we need to send a 'callback' queue address with the +request. We can use the default queue. Let's try it: + +```kotlin +val callbackQueueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() +) +val callbackQueueName = callbackQueueDeclared.queueName + +val requestProps = properties { + replyTo = callbackQueueName +} + +channel.basicPublish( + "30".toByteArray(), + exchange = "", + routingKey = "rpc_queue", + properties = requestProps +) + +// ... then code to read a response message from the callback queue ... +``` + +> #### Message properties +> +> The AMQP 0-9-1 protocol predefines a set of 14 properties that go with +> a message. Most of the properties are rarely used, with the exception of +> the following: +> +> * `deliveryMode`: Marks a message as persistent (with a value of 2) +> or transient (any other value). +> * `contentType`: Used to describe the mime-type of the encoding. +> For example for the often used JSON encoding it is a good practice +> to set this property to: `application/json`. +> * `replyTo`: Commonly used to name a callback queue. +> * `correlationId`: Useful to correlate RPC responses with requests. + +Correlation Id +-------------- + +In the method presented above we suggest creating a callback queue for +every RPC request. That's pretty inefficient, but fortunately there is +a better way - let's create a single callback queue per client. + +That raises a new issue, having received a response in that queue it's +not clear to which request the response belongs. That's when the +`correlationId` property is used. We're going to set it to a unique +value for every request. Later, when we receive a message in the +callback queue we'll look at this property, and based on that we'll be +able to match a response with a request. If we see an unknown +`correlationId` value, we may safely discard the message - it doesn't +belong to our requests. + +You may ask, why should we ignore unknown messages in the callback +queue, rather than failing with an error? It's due to a possibility of +a race condition on the server side. Although unlikely, it is possible +that the RPC server will die just after sending us the answer, but +before sending an acknowledgment message for the request. If that +happens, the restarted RPC server will process the request again. +That's why on the client we must handle the duplicate responses +gracefully, and the RPC should ideally be idempotent. + +Summary +------- + + + +Our RPC will work like this: + + * For an RPC request, the Client sends a message with two properties: + `replyTo`, which is set to a callback queue created just for the request, + and `correlationId`, which is set to a unique value for every request. + * The request is sent to an `rpc_queue` queue. + * The RPC worker (aka: server) is waiting for requests on that queue. + When a request appears, it does the job and sends a message with the + result back to the Client, using the queue from the `replyTo` field. + * The client waits for data on the callback queue. When a message + appears, it checks the `correlationId` property. If it matches + the value from the request it returns the response to the application. + +Putting it all together +----------------------- + +The Fibonacci function: + +```kotlin +private fun fib(n: Int): Int { + return when { + n == 0 -> 0 + n == 1 -> 1 + else -> fib(n - 1) + fib(n - 2) + } +} +``` + +We declare our fibonacci function. It assumes only valid positive integer input. +(Don't expect this one to work for big numbers, +and it's probably the slowest recursive implementation possible). + + +The code for our RPC server: + +```kotlin +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import dev.kourier.amqp.properties +import kotlinx.coroutines.CoroutineScope + +suspend fun rpcServer(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + "rpc_queue", + durable = false, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + + channel.basicQos(count = 1u, global = false) + + println(" [x] Awaiting RPC requests") + + val consumer = channel.basicConsume("rpc_queue", noAck = false) + + for (delivery in consumer) { + val props = delivery.message.properties + val correlationId = props.correlationId + val replyTo = props.replyTo + + val requestMessage = delivery.message.body.decodeToString() + val n = requestMessage.toIntOrNull() ?: 0 + + println(" [.] fib($n)") + val response = fib(n) + + val replyProps = properties { + this.correlationId = correlationId + } + + if (replyTo != null) { + channel.basicPublish( + response.toString().toByteArray(), + exchange = "", + routingKey = replyTo, + properties = replyProps + ) + } + + channel.basicAck(delivery.message, multiple = false) + } + + channel.close() + connection.close() +} + +private fun fib(n: Int): Int { + return when { + n == 0 -> 0 + n == 1 -> 1 + else -> fib(n - 1) + fib(n - 2) + } +} +``` + +The server code is rather straightforward: + + * As usual we start by establishing the connection and declaring the queue. + * We might want to run more than one server process. In order to spread + the load equally over multiple servers we need to set the + `basicQos` setting. + * We use `basicConsume` to access the queue, where we provide the callback + that will do the work and send the response back. + + +The code for our RPC client: + +```kotlin +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import dev.kourier.amqp.properties +import kotlinx.coroutines.CoroutineScope +import java.util.UUID + +suspend fun rpcClient(coroutineScope: CoroutineScope, n: Int): Int { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + val callbackQueueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() + ) + val callbackQueueName = callbackQueueDeclared.queueName + + val correlationId = UUID.randomUUID().toString() + + val consumer = channel.basicConsume(callbackQueueName, noAck = true) + var result = 0 + + val requestProps = properties { + this.correlationId = correlationId + this.replyTo = callbackQueueName + } + + channel.basicPublish( + n.toString().toByteArray(), + exchange = "", + routingKey = "rpc_queue", + properties = requestProps + ) + println(" [x] Requesting fib($n)") + + for (delivery in consumer) { + val responseCorrelationId = delivery.message.properties.correlationId + + if (responseCorrelationId == correlationId) { + result = delivery.message.body.decodeToString().toInt() + println(" [.] Got $result") + break + } + } + + channel.close() + connection.close() + + return result +} +``` + +We establish a connection and channel. We declare an exclusive callback queue +for replies. We subscribe to the callback queue, so that we can receive RPC +responses. We generate a unique `correlationId` number and save it. The loop +is waiting for an appropriate response and whenever we get a response we check +if the `correlationId` is the one we're looking for. If so, we save the response. + +Making the RPC request: + +```kotlin +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + // Start RPC server + launch { + rpcServer(this) + } + + delay(1000) + + // Make RPC call + val result = rpcClient(this, 30) + println("fib(30) = $result") +} +``` + +The presented design is not the only possible implementation of an RPC +service, but it has some important advantages: + + * If the RPC server is too slow, you can scale up by just running + another one. Try running a second RPC server. + * On the client side, the RPC requires sending and receiving only one + message. No synchronous calls like `queueDeclare` are required. As a + result the RPC client needs only one network round trip for a single + RPC request. + +Our code is still pretty simplistic and doesn't try to solve more +complex (but important) problems, like: + + * How should the client react if there are no servers running? + * Should a client have some kind of timeout for the RPC? + * If the server malfunctions and raises an exception, should it be + forwarded to the client? + * Protecting against invalid incoming messages (eg checking bounds, + type) before processing. + +>If you want to experiment, you may find the [management UI](/docs/management) useful for viewing the queues. + +Move on to [tutorial 7](./tutorial-seven-kotlin) to learn about publisher confirms. diff --git a/tutorials/tutorial-three-kotlin.md b/tutorials/tutorial-three-kotlin.md new file mode 100644 index 0000000000..9224e6126e --- /dev/null +++ b/tutorials/tutorial-three-kotlin.md @@ -0,0 +1,366 @@ +--- +title: RabbitMQ tutorial - Publish/Subscribe +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T3DiagramToC from '@site/src/components/Tutorials/T3DiagramToC.md'; +import T3DiagramBinding from '@site/src/components/Tutorials/T3DiagramBinding.md'; +import T3DiagramFull from '@site/src/components/Tutorials/T3DiagramFull.md'; + +# RabbitMQ tutorial - Publish/Subscribe + +## Publish/Subscribe +### (using the Kotlin Client) + + + + + +In the [previous tutorial](./tutorial-two-kotlin) we created a work +queue. The assumption behind a work queue is that each task is +delivered to exactly one worker. In this part we'll do something +completely different -- we'll deliver a message to multiple +consumers. This pattern is known as "publish/subscribe". + +To illustrate the pattern, we're going to build a simple logging +system. It will consist of two programs -- the first will emit log +messages and the second will receive and print them. + +In our logging system every running copy of the receiver program will +get the messages. That way we can run one receiver and direct the logs +to disk; and at the same time we can run another receiver and see the +logs on the screen. + +Essentially, published log messages are going to be broadcast to all +the receivers. + +Exchanges +--------- + +In previous parts of the tutorial we sent and received messages to and +from a queue. Now it's time to introduce the full messaging model in +Rabbit. + +Let's quickly go over what we covered in the previous tutorials: + +* A _producer_ is a user application that sends messages. +* A _queue_ is a buffer that stores messages. +* A _consumer_ is a user application that receives messages. + +The core idea in the messaging model in RabbitMQ is that the producer +never sends any messages directly to a queue. Actually, quite often +the producer doesn't even know if a message will be delivered to any +queue at all. + +Instead, the producer can only send messages to an _exchange_. An +exchange is a very simple thing. On one side it receives messages from +producers and the other side it pushes them to queues. The exchange +must know exactly what to do with a message it receives. Should it be +appended to a particular queue? Should it be appended to many queues? +Or should it get discarded. The rules for that are defined by the +_exchange type_. + + + +There are a few exchange types available: `direct`, `topic`, `headers` +and `fanout`. We'll focus on the last one -- the fanout. Let's create +an exchange of that type, and call it `logs`: + +```kotlin +channel.exchangeDeclare( + "logs", + BuiltinExchangeType.FANOUT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() +) +``` + +The fanout exchange is very simple. As you can probably guess from the +name, it just broadcasts all the messages it receives to all the +queues it knows. And that's exactly what we need for our logger. + + +> #### Listing exchanges +> +> To list the exchanges on the server you can run the ever useful `rabbitmqctl`: +> +> ```bash +> sudo rabbitmqctl list_exchanges +> ``` +> +> In this list there will be some `amq.*` exchanges and the default (unnamed) +> exchange. These are created by default, but it is unlikely you'll need to +> use them at the moment. + + +> #### The default exchange +> +> In previous parts of the tutorial we knew nothing about exchanges, +> but still were able to send messages to queues. That was possible +> because we were using a default exchange, which we identify by the empty string (`""`). +> +> Recall how we published a message before: +> +> ```kotlin +> channel.basicPublish( +> message.toByteArray(), +> exchange = "", +> routingKey = "hello", +> properties = Properties() +> ) +> ``` +> +> The `exchange` parameter is the name of the exchange. The empty string +> denotes the default or _nameless_ exchange: messages are routed to +> the queue with the name specified by `routingKey`, if it exists. + +Now, we can publish to our named exchange instead: + +```kotlin +channel.basicPublish( + message.toByteArray(), + exchange = "logs", + routingKey = "", + properties = Properties() +) +``` + +Temporary queues +---------------- + +As you may remember previously we were using queues that had +specific names (remember `hello` and `task_queue`?). Being able to name +a queue was crucial for us -- we needed to point the workers to the +same queue. Giving a queue a name is important when you +want to share the queue between producers and consumers. + +But that's not the case for our logger. We want to hear about all +log messages, not just a subset of them. We're +also interested only in currently flowing messages not in the old +ones. To solve that we need two things. + +Firstly, whenever we connect to Rabbit we need a fresh, empty queue. +To do this we could create a queue with a random name, or, +even better - let the server choose a random queue name for us. + +Secondly, once we disconnect the consumer the queue should be +automatically deleted. + +In the Kotlin client, when we supply an empty string as the queue name, we +create a non-durable queue with a generated name: + +```kotlin +val queueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() +) +val queueName = queueDeclared.queueName +``` + +At this point `queueName` contains a random queue name. For example +it may look like `amq.gen-JzTY20BRgKO-HjmUJj0wLg`. + +When the connection that declared it closes, the queue will be deleted +because it is declared as exclusive. + +Bindings +-------- + + + +We've already created a fanout exchange and a queue. Now we need to +tell the exchange to send messages to our queue. That relationship +between exchange and a queue is called a _binding_. + +```kotlin +channel.queueBind( + queue = queueName, + exchange = "logs", + routingKey = "" +) +``` + +From now on the `logs` exchange will append messages to our queue. + +> #### Listing bindings +> +> You can list existing bindings using, you guessed it, +> ```bash +> rabbitmqctl list_bindings +> ``` + +Putting it all together +----------------------- + + + +The producer program, which emits log messages, doesn't look much +different from the previous tutorial. The most important change is that +we now want to publish messages to our `logs` exchange instead of the +nameless one. We need to supply a `routingKey` when sending, but its +value is ignored for `fanout` exchanges. Here goes the code for +`emitLog` function: + +```kotlin +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.Properties +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope + +suspend fun emitLog(coroutineScope: CoroutineScope, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.exchangeDeclare( + "logs", + BuiltinExchangeType.FANOUT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + channel.basicPublish( + message.toByteArray(), + exchange = "logs", + routingKey = "", + properties = Properties() + ) + println(" [x] Sent '$message'") + + channel.close() + connection.close() +} +``` + +As you can see, after establishing the connection we declared the +exchange. This step is necessary as publishing to a non-existing +exchange is forbidden. + +The messages will be lost if no queue is bound to the exchange yet, +but that's okay for us; if no consumer is listening yet we can safely discard the message. + +The code for `receiveLogs`: + +```kotlin +import dev.kourier.amqp.BuiltinExchangeType +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope + +suspend fun receiveLogs(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.exchangeDeclare( + "logs", + BuiltinExchangeType.FANOUT, + durable = false, + autoDelete = false, + internal = false, + arguments = emptyMap() + ) + + val queueDeclared = channel.queueDeclare( + name = "", + durable = false, + exclusive = true, + autoDelete = true, + arguments = emptyMap() + ) + val queueName = queueDeclared.queueName + + channel.queueBind( + queue = queueName, + exchange = "logs", + routingKey = "" + ) + + println(" [*] Waiting for logs. To exit press CTRL+C") + + val consumer = channel.basicConsume(queueName, noAck = true) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [x] $message") + } + + channel.close() + connection.close() +} +``` + +We're done. If you want to save logs to a file, just open a console and run: + +```kotlin +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + launch { receiveLogs(this) } + // In practice, you would redirect output to a file +} +``` + +If you wish to see the logs on your screen, spawn a new terminal and run the receiver again. + +And of course, to emit logs: + +```kotlin +import kotlinx.coroutines.runBlocking + +fun main() = runBlocking { + emitLog(this, "info: Application started") +} +``` + +Using `rabbitmqctl list_bindings` you can verify that the code actually +creates bindings and queues as we want. With two `receiveLogs` programs +running you should see something like: + +```bash +sudo rabbitmqctl list_bindings +# => Listing bindings ... +# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] +# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] +# => ...done. +``` + +The interpretation of the result is straightforward: data from +exchange `logs` goes to two queues with server-assigned names. And +that's exactly what we intended. + +To find out how to listen for a subset of messages, let's move on to +[tutorial 4](./tutorial-four-kotlin) diff --git a/tutorials/tutorial-two-kotlin.md b/tutorials/tutorial-two-kotlin.md new file mode 100644 index 0000000000..de01cb0302 --- /dev/null +++ b/tutorials/tutorial-two-kotlin.md @@ -0,0 +1,456 @@ +--- +title: RabbitMQ tutorial - Work Queues +--- + + +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; +import T2DiagramToC from '@site/src/components/Tutorials/T2DiagramToC.md'; +import T2DiagramPrefetch from '@site/src/components/Tutorials/T2DiagramPrefetch.md'; + +# RabbitMQ tutorial - Work Queues + +## Work Queues +### (using the Kotlin Client) + + + + + +In the [first tutorial](./tutorial-one-kotlin) we +wrote programs to send and receive messages from a named queue. In this +one we'll create a _Work Queue_ that will be used to distribute +time-consuming tasks among multiple workers. + +The main idea behind Work Queues (aka: _Task Queues_) is to avoid +doing a resource-intensive task immediately and having to wait for +it to complete. Instead we schedule the task to be done later. We encapsulate a +_task_ as a message and send it to a queue. A worker process running +in the background will pop the tasks and eventually execute the +job. When you run many workers the tasks will be shared between them. + +This concept is especially useful in web applications where it's +impossible to handle a complex task during a short HTTP request +window. + +Preparation +----------- + +In the previous part of this tutorial we sent a message containing +"Hello World!". Now we'll be sending strings that stand for complex +tasks. We don't have a real-world task, like images to be resized or +pdf files to be rendered, so let's fake it by just pretending we're +busy - by using the `delay()` function. We'll take the number of dots +in the string as its complexity; every dot will account for one second +of "work". For example, a fake task described by `Hello...` +will take three seconds. + +We will slightly modify the `send` function from our previous example, +to allow messages to be sent as tasks. This +program will schedule tasks to our work queue, so let's name it +`newTask`: + +```kotlin +suspend fun newTask(coroutineScope: CoroutineScope, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + "task_queue", + durable = true, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + + val properties = properties { + deliveryMode = 2u // Persistent message + } + + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "task_queue", + properties = properties + ) + println(" [x] Sent '$message'") + + channel.close() + connection.close() +} +``` + +Our old `receive` function also requires some changes: it needs to +fake a second of work for every dot in the message body. It will handle +delivered messages and perform the task, so let's call it `worker`: + +```kotlin +suspend fun worker(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + "task_queue", + durable = true, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + println(" [*] Waiting for messages. To exit press CTRL+C") + + val consumer = channel.basicConsume("task_queue", noAck = true) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [x] Received '$message'") + + try { + doWork(message) + } finally { + println(" [x] Done") + } + } + + channel.close() + connection.close() +} +``` + +Our fake task to simulate execution time: + +```kotlin +private suspend fun doWork(task: String) { + for (ch in task) { + if (ch == '.') { + delay(1000) // Sleep for 1 second per dot + } + } +} +``` + +Round-robin dispatching +------------------------ + +One of the advantages of using a Task Queue is the ability to easily +parallelise work. If we are building up a backlog of work, we can just +add more workers and that way, scale easily. + +By default, RabbitMQ will send each message to the next consumer, +in sequence. On average every consumer will get the same number of +messages. This way of distributing messages is called round-robin. + +Message acknowledgment +---------------------- + +Doing a task can take a few seconds, you may wonder what happens if +a consumer starts a long task and it terminates before it completes. +With our current code, once RabbitMQ delivers a message to the consumer, it +immediately marks it for deletion. In this case, if you terminate a worker, +the message it was just processing is lost. The messages that were dispatched +to this particular worker but were not yet handled are also lost. + +But we don't want to lose any tasks. If a worker dies, we'd like the +task to be delivered to another worker. + +In order to make sure a message is never lost, RabbitMQ supports +[message _acknowledgments_](/docs/confirms). An ack(nowledgement) is sent back by the +consumer to tell RabbitMQ that a particular message has been received, +processed and that RabbitMQ is free to delete it. + +If a consumer dies (its channel is closed, connection is closed, or +TCP connection is lost) without sending an ack, RabbitMQ will +understand that a message wasn't processed fully and will re-queue it. +If there are other consumers online at the same time, it will then quickly +redeliver it to another consumer. That way you can be sure that no +message is lost, even if the workers occasionally die. + +A timeout (30 minutes by default) is enforced on consumer delivery acknowledgement. +This helps detect buggy (stuck) consumers that never acknowledge deliveries. +You can increase this timeout as described in +[Delivery Acknowledgement Timeout](/docs/consumers#acknowledgement-timeout). + +[Manual message acknowledgments](/docs/confirms) are turned off by default in the previous examples. +It's time to turn them on using the `noAck = false` flag and send a proper acknowledgment +from the worker, once we're done with a task. + +```kotlin +channel.basicQos(count = 1u, global = false) + +val consumer = channel.basicConsume("task_queue", noAck = false) + +for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [x] Received '$message'") + + try { + doWork(message) + println(" [x] Done") + } finally { + channel.basicAck(delivery.message, multiple = false) + } +} +``` + +Using this code, you can ensure that even if you terminate a worker using +CTRL+C while it was processing a message, nothing is lost. Soon +after the worker terminates, all unacknowledged messages are redelivered. + +Acknowledgement must be sent on the same channel that received the +delivery. Attempts to acknowledge using a different channel will result +in a channel-level protocol exception. See the [doc guide on confirmations](/docs/confirms) +to learn more. + +> #### Forgotten acknowledgment +> +> It's a common mistake to miss the `basicAck`. It's an easy error, +> but the consequences are serious. Messages will be redelivered +> when your client quits (which may look like random redelivery), but +> RabbitMQ will eat more and more memory as it won't be able to release +> any unacked messages. +> +> In order to debug this kind of mistake you can use `rabbitmqctl` +> to print the `messages_unacknowledged` field: +> +> ```bash +> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged +> ``` +> +> On Windows, drop the sudo: +> ```bash +> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged +> ``` + +Message durability +------------------ + +We have learned how to make sure that even if the consumer dies, the +task isn't lost. But our tasks will still be lost if RabbitMQ server stops. + +When RabbitMQ quits or crashes it will forget the queues and messages +unless you tell it not to. Two things are required to make sure that +messages aren't lost: we need to mark both the queue and messages as +durable. + +First, we need to make sure that the queue will survive a RabbitMQ node restart. +In order to do so, we need to declare it as _durable_: + +```kotlin +channel.queueDeclare( + "task_queue", + durable = true, + exclusive = false, + autoDelete = false, + arguments = emptyMap() +) +``` + +Although this command is correct by itself, it won't work in our present +setup. That's because we've already defined a queue called `hello` +which is not durable. RabbitMQ doesn't allow you to redefine an existing queue +with different parameters and will return an error to any program +that tries to do that. But there is a quick workaround - let's declare +a queue with different name, for example `task_queue`: + +```kotlin +channel.queueDeclare( + "task_queue", + durable = true, + exclusive = false, + autoDelete = false, + arguments = emptyMap() +) +``` + +This `durable` option change needs to be applied to both the producer +and consumer code. + +At this point we're sure that the `task_queue` queue won't be lost +even if RabbitMQ restarts. Now we need to mark our messages as persistent +- by setting `deliveryMode` property to `2u`. + +```kotlin +val properties = properties { + deliveryMode = 2u // Persistent message +} + +channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "task_queue", + properties = properties +) +``` + +> #### Note on message persistence +> +> Marking messages as persistent doesn't fully guarantee that a message +> won't be lost. Although it tells RabbitMQ to save the message to disk, +> there is still a short time window when RabbitMQ has accepted a message and +> hasn't saved it yet. Also, RabbitMQ doesn't do `fsync(2)` for every +> message -- it may be just saved to cache and not really written to the +> disk. The persistence guarantees aren't strong, but it's more than enough +> for our simple task queue. If you need a stronger guarantee then you can use +> [publisher confirms](/docs/confirms). + +Fair dispatch +------------- + + + +You might have noticed that the dispatching still doesn't work exactly +as we want. For example in a situation with two workers, when all +odd messages are heavy and even messages are light, one worker will be +constantly busy and the other one will do hardly any work. Well, +RabbitMQ doesn't know anything about that and will still dispatch +messages evenly. + +This happens because RabbitMQ just dispatches a message when the message +enters the queue. It doesn't look at the number of unacknowledged +messages for a consumer. It just blindly dispatches every n-th message +to the n-th consumer. + +In order to defeat that we can use the `basicQos` method with the +`count = 1u` setting. This tells RabbitMQ not to give more than +one message to a worker at a time. Or, in other words, don't dispatch +a new message to a worker until it has processed and acknowledged the +previous one. Instead, it will dispatch it to the next worker that is not still busy. + +```kotlin +channel.basicQos(count = 1u, global = false) +``` + +> #### Note about queue size +> +> If all the workers are busy, your queue can fill up. You will want to keep an +> eye on that, and maybe add more workers, or use [message TTL](/docs/ttl). + +Putting it all together +----------------------- + +Final code of our `newTask` function: + +```kotlin +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import dev.kourier.amqp.properties +import kotlinx.coroutines.CoroutineScope + +suspend fun newTask(coroutineScope: CoroutineScope, message: String) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + "task_queue", + durable = true, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + + val properties = properties { + deliveryMode = 2u // Persistent message + } + + channel.basicPublish( + message.toByteArray(), + exchange = "", + routingKey = "task_queue", + properties = properties + ) + println(" [x] Sent '$message'") + + channel.close() + connection.close() +} +``` + +And our `worker`: + +```kotlin +import dev.kourier.amqp.connection.amqpConfig +import dev.kourier.amqp.connection.createAMQPConnection +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay + +suspend fun worker(coroutineScope: CoroutineScope) { + val config = amqpConfig { + server { + host = "localhost" + } + } + val connection = createAMQPConnection(coroutineScope, config) + val channel = connection.openChannel() + + channel.queueDeclare( + "task_queue", + durable = true, + exclusive = false, + autoDelete = false, + arguments = emptyMap() + ) + println(" [*] Waiting for messages. To exit press CTRL+C") + + channel.basicQos(count = 1u, global = false) + + val consumer = channel.basicConsume("task_queue", noAck = false) + + for (delivery in consumer) { + val message = delivery.message.body.decodeToString() + println(" [x] Received '$message'") + + try { + doWork(message) + println(" [x] Done") + } finally { + channel.basicAck(delivery.message, multiple = false) + } + } + + channel.close() + connection.close() +} + +private suspend fun doWork(task: String) { + for (ch in task) { + if (ch == '.') { + delay(1000) // Sleep for 1 second per dot + } + } +} +``` + +Using message acknowledgments and `basicQos` you can set up a +work queue. The durability options let the tasks survive even if +RabbitMQ is restarted. + +Now we can move on to [tutorial 3](./tutorial-three-kotlin) and learn how +to deliver the same message to many consumers. From 6a40e66b23cf2ac966ed39a1cfb5301df9d2d46e Mon Sep 17 00:00:00 2001 From: NathanFallet Date: Sat, 29 Nov 2025 13:45:02 +0100 Subject: [PATCH 2/2] fix ordering --- tutorials/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/index.md b/tutorials/index.md index e3934b44cb..08d6d9fdd8 100644 --- a/tutorials/index.md +++ b/tutorials/index.md @@ -197,8 +197,8 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1. Reliable publishing with publisher confirms * [Java](tutorials/tutorial-seven-java) - * [C#](tutorials/tutorial-seven-dotnet) * [Kotlin](tutorials/tutorial-seven-kotlin) + * [C#](tutorials/tutorial-seven-dotnet) * [PHP](tutorials/tutorial-seven-php)