diff --git a/tutorials/index.md b/tutorials/index.md
index caceec07e..08d6d9fdd 100644
--- a/tutorials/index.md
+++ b/tutorials/index.md
@@ -72,6 +72,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
* [Python](tutorials/tutorial-one-python)
* [Java](tutorials/tutorial-one-java)
+ * [Kotlin](tutorials/tutorial-one-kotlin)
* [Ruby](tutorials/tutorial-one-ruby)
* [PHP](tutorials/tutorial-one-php)
* [C#](tutorials/tutorial-one-dotnet)
@@ -92,6 +93,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
* [Python](tutorials/tutorial-two-python)
* [Java](tutorials/tutorial-two-java)
+ * [Kotlin](tutorials/tutorial-two-kotlin)
* [Ruby](tutorials/tutorial-two-ruby)
* [PHP](tutorials/tutorial-two-php)
* [C#](tutorials/tutorial-two-dotnet)
@@ -112,6 +114,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
* [Python](tutorials/tutorial-three-python)
* [Java](tutorials/tutorial-three-java)
+ * [Kotlin](tutorials/tutorial-three-kotlin)
* [Ruby](tutorials/tutorial-three-ruby)
* [PHP](tutorials/tutorial-three-php)
* [C#](tutorials/tutorial-three-dotnet)
@@ -134,6 +137,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
* [Python](tutorials/tutorial-four-python)
* [Java](tutorials/tutorial-four-java)
+ * [Kotlin](tutorials/tutorial-four-kotlin)
* [Ruby](tutorials/tutorial-four-ruby)
* [PHP](tutorials/tutorial-four-php)
* [C#](tutorials/tutorial-four-dotnet)
@@ -154,6 +158,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
* [Python](tutorials/tutorial-five-python)
* [Java](tutorials/tutorial-five-java)
+ * [Kotlin](tutorials/tutorial-five-kotlin)
* [Ruby](tutorials/tutorial-five-ruby)
* [PHP](tutorials/tutorial-five-php)
* [C#](tutorials/tutorial-five-dotnet)
@@ -174,6 +179,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
* [Python](tutorials/tutorial-six-python)
* [Java](tutorials/tutorial-six-java)
+ * [Kotlin](tutorials/tutorial-six-kotlin)
* [Ruby](tutorials/tutorial-six-ruby)
* [PHP](tutorials/tutorial-six-php)
* [C#](tutorials/tutorial-six-dotnet)
@@ -191,6 +197,7 @@ This section covers the default RabbitMQ protocol, AMQP 0-9-1.
Reliable publishing with publisher confirms
* [Java](tutorials/tutorial-seven-java)
+ * [Kotlin](tutorials/tutorial-seven-kotlin)
* [C#](tutorials/tutorial-seven-dotnet)
* [PHP](tutorials/tutorial-seven-php)
diff --git a/tutorials/tutorial-five-kotlin.md b/tutorials/tutorial-five-kotlin.md
new file mode 100644
index 000000000..829087c03
--- /dev/null
+++ b/tutorials/tutorial-five-kotlin.md
@@ -0,0 +1,292 @@
+---
+title: RabbitMQ tutorial - Topics
+---
+
+
+import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
+import T5DiagramToC from '@site/src/components/Tutorials/T5DiagramToC.md';
+import T5DiagramTopicX from '@site/src/components/Tutorials/T5DiagramTopicX.md';
+
+# RabbitMQ tutorial - Topics
+
+## Topics
+### (using the Kotlin Client)
+
+
+
+
+
+In the [previous tutorial](./tutorial-four-kotlin) we improved our
+logging system. Instead of using a `fanout` exchange only capable of
+dummy broadcasting, we used a `direct` one, and gained a possibility
+of selectively receiving the logs.
+
+Although using the `direct` exchange improved our system, it still has
+limitations - it can't do routing based on multiple criteria.
+
+In our logging system we might want to subscribe to not only logs
+based on severity, but also based on the source which emitted the log.
+You might know this concept from the
+[`syslog`](http://en.wikipedia.org/wiki/Syslog) unix tool, which
+routes logs based on both severity (info/warn/crit...) and facility
+(auth/cron/kern...).
+
+That would give us a lot of flexibility - we may want to listen to just
+critical errors coming from 'cron' but also all logs from 'kern'.
+
+To implement that in our logging system we need to learn about a more
+complex `topic` exchange.
+
+
+Topic exchange
+--------------
+
+Messages sent to a `topic` exchange can't have an arbitrary
+`routing_key` - it must be a list of words, delimited by dots. The
+words can be anything, but usually they specify some features
+connected to the message. A few valid routing key examples:
+"`stock.usd.nyse`", "`nyse.vmw`", "`quick.orange.rabbit`". There can be as
+many words in the routing key as you like, up to the limit of 255
+bytes.
+
+The binding key must also be in the same form. The logic behind the
+`topic` exchange is similar to a `direct` one - a message sent with a
+particular routing key will be delivered to all the queues that are
+bound with a matching binding key. However there are two important
+special cases for binding keys:
+
+ * `*` (star) can substitute for exactly one word.
+ * `#` (hash) can substitute for zero or more words.
+
+It's easiest to explain this in an example:
+
+
+
+In this example, we're going to send messages which all describe
+animals. The messages will be sent with a routing key that consists of
+three words (two dots). The first word in the routing key will
+describe speed, second a colour and third a species:
+"`..`".
+
+We created three bindings: Q1 is bound with binding key "`*.orange.*`"
+and Q2 with "`*.*.rabbit`" and "`lazy.#`".
+
+These bindings can be summarised as:
+
+ * Q1 is interested in all the orange animals.
+ * Q2 wants to hear everything about rabbits, and everything about lazy
+ animals.
+
+A message with a routing key set to "`quick.orange.rabbit`"
+will be delivered to both queues. Message "`lazy.orange.elephant`"
+also will go to both of them. On the other hand
+"`quick.orange.fox`" will only go to the first queue, and
+"`lazy.brown.fox`" only to the second. "`lazy.pink.rabbit`" will
+be delivered to the second queue only once, even though it matches two bindings.
+"`quick.brown.fox`" doesn't match any binding so it will be discarded.
+
+What happens if we break our contract and send a message with one or
+four words, like "`orange`" or "`quick.orange.male.rabbit`"? Well,
+these messages won't match any bindings and will be lost.
+
+On the other hand "`lazy.orange.male.rabbit`", even though it has four
+words, will match the last binding and will be delivered to the second
+queue.
+
+> #### Topic exchange
+>
+> Topic exchange is powerful and can behave like other exchanges.
+>
+> When a queue is bound with "`#`" (hash) binding key - it will receive
+> all the messages, regardless of the routing key - like in `fanout` exchange.
+>
+> When special characters "`*`" (star) and "`#`" (hash) aren't used in bindings,
+> the topic exchange will behave just like a `direct` one.
+
+Putting it all together
+-----------------------
+
+We're going to use a `topic` exchange in our logging system. We'll
+start off with a working assumption that the routing keys of logs will
+have two words: "`.`".
+
+The code is almost the same as in the
+[previous tutorial](./tutorial-four-kotlin).
+
+The code for `emitLogTopic`:
+
+```kotlin
+import dev.kourier.amqp.BuiltinExchangeType
+import dev.kourier.amqp.Properties
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun emitLogTopic(coroutineScope: CoroutineScope, routingKey: String, message: String) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.exchangeDeclare(
+ "topic_logs",
+ BuiltinExchangeType.TOPIC,
+ durable = false,
+ autoDelete = false,
+ internal = false,
+ arguments = emptyMap()
+ )
+
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "topic_logs",
+ routingKey = routingKey,
+ properties = Properties()
+ )
+ println(" [x] Sent '$routingKey':'$message'")
+
+ channel.close()
+ connection.close()
+}
+```
+
+The code for `receiveLogsTopic`:
+
+```kotlin
+import dev.kourier.amqp.BuiltinExchangeType
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun receiveLogsTopic(
+ coroutineScope: CoroutineScope,
+ bindingKeys: List
+) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.exchangeDeclare(
+ "topic_logs",
+ BuiltinExchangeType.TOPIC,
+ durable = false,
+ autoDelete = false,
+ internal = false,
+ arguments = emptyMap()
+ )
+
+ val queueDeclared = channel.queueDeclare(
+ name = "",
+ durable = false,
+ exclusive = true,
+ autoDelete = true,
+ arguments = emptyMap()
+ )
+ val queueName = queueDeclared.queueName
+
+ if (bindingKeys.isEmpty()) {
+ System.err.println("Usage: receiveLogsTopic [binding_key]...")
+ return
+ }
+
+ for (bindingKey in bindingKeys) {
+ channel.queueBind(
+ queue = queueName,
+ exchange = "topic_logs",
+ routingKey = bindingKey
+ )
+ }
+
+ println(" [*] Waiting for logs. To exit press CTRL+C")
+
+ val consumer = channel.basicConsume(queueName, noAck = true)
+
+ for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ val routingKey = delivery.message.routingKey
+ println(" [x] Received '$routingKey':'$message'")
+ }
+
+ channel.close()
+ connection.close()
+}
+```
+
+To receive all the logs:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ receiveLogsTopic(this, listOf("#"))
+}
+```
+
+To receive all logs from the facility "`kern`":
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ receiveLogsTopic(this, listOf("kern.*"))
+}
+```
+
+Or if you want to hear only about "`critical`" logs:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ receiveLogsTopic(this, listOf("*.critical"))
+}
+```
+
+You can create multiple bindings:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ receiveLogsTopic(this, listOf("kern.*", "*.critical"))
+}
+```
+
+And to emit a log with a routing key "`kern.critical`" type:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ emitLogTopic(this, "kern.critical", "A critical kernel error")
+}
+```
+
+Have fun playing with these programs. Note that the code doesn't make
+any assumption about the routing or binding keys, you may want to play
+with more than two routing key parameters.
+
+Move on to [tutorial 6](./tutorial-six-kotlin) to find out how to do a
+round trip message as a remote procedure call.
diff --git a/tutorials/tutorial-four-kotlin.md b/tutorials/tutorial-four-kotlin.md
new file mode 100644
index 000000000..7ed9d4080
--- /dev/null
+++ b/tutorials/tutorial-four-kotlin.md
@@ -0,0 +1,309 @@
+---
+title: RabbitMQ tutorial - Routing
+---
+
+
+import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
+import T4DiagramDirectX from '@site/src/components/Tutorials/T4DiagramDirectX.md';
+import T4DiagramMultipleBindings from '@site/src/components/Tutorials/T4DiagramMultipleBindings.md';
+import T4DiagramFull from '@site/src/components/Tutorials/T4DiagramFull.md';
+
+# RabbitMQ tutorial - Routing
+
+## Routing
+### (using the Kotlin Client)
+
+
+
+In the [previous tutorial](./tutorial-three-kotlin) we built a
+simple logging system. We were able to broadcast log messages to many
+receivers.
+
+In this tutorial we're going to add a feature to it - we're going to
+make it possible to subscribe only to a subset of the messages. For
+example, we will be able to direct only critical error messages to the
+log file (to save disk space), while still being able to print all of
+the log messages on the console.
+
+
+Bindings
+--------
+
+In previous examples we were already creating bindings. You may recall
+code like:
+
+```kotlin
+channel.queueBind(queueName, "logs", routingKey = "")
+```
+
+A binding is a relationship between an exchange and a queue. This can
+be simply read as: the queue is interested in messages from this
+exchange.
+
+Bindings can take an extra `routingKey` parameter. To avoid the
+confusion with a `basicPublish` parameter we're going to call it a
+`binding key`. This is how we could create a binding with a key:
+
+```kotlin
+channel.queueBind(queueName, "direct_logs", routingKey = "black")
+```
+
+The meaning of a binding key depends on the exchange type. The
+`fanout` exchanges, which we used previously, simply ignored its
+value.
+
+Direct exchange
+---------------
+
+Our logging system from the previous tutorial broadcasts all messages
+to all consumers. We want to extend that to allow filtering messages
+based on their severity. For example we may want a program which
+writes log messages to the disk to only receive critical errors, and
+not waste disk space on warning or info log messages.
+
+We were using a `fanout` exchange, which doesn't give us much
+flexibility - it's only capable of mindless broadcasting.
+
+We will use a `direct` exchange instead. The routing algorithm behind
+a `direct` exchange is simple - a message goes to the queues whose
+`binding key` exactly matches the `routing key` of the message.
+
+To illustrate that, consider the following setup:
+
+
+
+In this setup, we can see the `direct` exchange `X` with two queues bound
+to it. The first queue is bound with binding key `orange`, and the second
+has two bindings, one with binding key `black` and the other one
+with `green`.
+
+In such a setup a message published to the exchange with a routing key
+`orange` will be routed to queue `Q1`. Messages with a routing key of `black`
+or `green` will go to `Q2`. All other messages will be discarded.
+
+
+Multiple bindings
+-----------------
+
+
+It is perfectly legal to bind multiple queues with the same binding
+key. In our example we could add a binding between `X` and `Q1` with
+binding key `black`. In that case, the `direct` exchange will behave
+like `fanout` and will broadcast the message to all the matching
+queues. A message with routing key `black` will be delivered to both
+`Q1` and `Q2`.
+
+
+Emitting logs
+-------------
+
+We'll use this model for our logging system. Instead of `fanout` we'll
+send messages to a `direct` exchange. We will supply the log severity as
+a `routing key`. That way the receiving program will be able to select
+the severity it wants to receive. Let's focus on emitting logs
+first.
+
+As always, we need to create an exchange first:
+
+```kotlin
+channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT)
+```
+
+And we're ready to send a message:
+
+```kotlin
+channel.basicPublish(
+ message.toByteArray(),
+ exchange = "direct_logs",
+ routingKey = severity,
+ properties = Properties()
+)
+```
+
+To simplify things we will assume that 'severity' can be one of
+`info`, `warning`, or `error`.
+
+
+Subscribing
+-----------
+
+Receiving messages will work just like in the previous tutorial, with
+one exception - we're going to create a new binding for each severity
+we're interested in.
+
+```kotlin
+val queueDeclared = channel.queueDeclare(
+ name = "",
+ durable = false,
+ exclusive = true,
+ autoDelete = true,
+ arguments = emptyMap()
+)
+val queueName = queueDeclared.queueName
+
+for (severity in severities) {
+ channel.queueBind(queueName, "direct_logs", routingKey = severity)
+}
+```
+
+
+Putting it all together
+-----------------------
+
+
+
+
+The code for `emitLogDirect` function:
+
+```kotlin
+import dev.kourier.amqp.BuiltinExchangeType
+import dev.kourier.amqp.Properties
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun emitLogDirect(coroutineScope: CoroutineScope, severity: String, message: String) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.exchangeDeclare(
+ "direct_logs",
+ BuiltinExchangeType.DIRECT,
+ durable = false,
+ autoDelete = false,
+ internal = false,
+ arguments = emptyMap()
+ )
+
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "direct_logs",
+ routingKey = severity,
+ properties = Properties()
+ )
+ println(" [x] Sent '$severity':'$message'")
+
+ channel.close()
+ connection.close()
+}
+```
+
+The code for `receiveLogsDirect`:
+
+```kotlin
+import dev.kourier.amqp.BuiltinExchangeType
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun receiveLogsDirect(
+ coroutineScope: CoroutineScope,
+ severities: List
+) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.exchangeDeclare(
+ "direct_logs",
+ BuiltinExchangeType.DIRECT,
+ durable = false,
+ autoDelete = false,
+ internal = false,
+ arguments = emptyMap()
+ )
+
+ val queueDeclared = channel.queueDeclare(
+ name = "",
+ durable = false,
+ exclusive = true,
+ autoDelete = true,
+ arguments = emptyMap()
+ )
+ val queueName = queueDeclared.queueName
+
+ if (severities.isEmpty()) {
+ System.err.println("Usage: receiveLogsDirect [info] [warning] [error]")
+ return
+ }
+
+ for (severity in severities) {
+ channel.queueBind(
+ queue = queueName,
+ exchange = "direct_logs",
+ routingKey = severity
+ )
+ }
+ println(" [*] Waiting for messages. To exit press CTRL+C")
+
+ val consumer = channel.basicConsume(queueName, noAck = true)
+
+ for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ val routingKey = delivery.message.routingKey
+ println(" [x] Received '$routingKey':'$message'")
+ }
+
+ channel.close()
+ connection.close()
+}
+```
+
+If you want to save only 'warning' and 'error' (and not 'info') log
+messages to a file, just open a console and type:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ receiveLogsDirect(this, listOf("warning", "error"))
+}
+```
+
+If you'd like to see all the log messages on your screen, open a new
+terminal and do:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ receiveLogsDirect(this, listOf("info", "warning", "error"))
+}
+```
+
+And, for example, to emit an `error` log message just type:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ emitLogDirect(this, "error", "Run. Run. Or it will explode.")
+}
+```
+
+Move on to [tutorial 5](./tutorial-five-kotlin) to find out how to listen
+for messages based on a pattern.
diff --git a/tutorials/tutorial-one-kotlin.md b/tutorials/tutorial-one-kotlin.md
new file mode 100644
index 000000000..3f92dfc7a
--- /dev/null
+++ b/tutorials/tutorial-one-kotlin.md
@@ -0,0 +1,331 @@
+---
+title: RabbitMQ tutorial - "Hello World!"
+---
+
+
+import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
+import TutorialsIntro from '@site/src/components/Tutorials/TutorialsIntro.md';
+import T1DiagramHello from '@site/src/components/Tutorials/T1DiagramHello.md';
+import T1DiagramSending from '@site/src/components/Tutorials/T1DiagramSending.md';
+import T1DiagramReceiving from '@site/src/components/Tutorials/T1DiagramReceiving.md';
+
+# RabbitMQ tutorial - "Hello World!"
+
+## Introduction
+
+
+
+
+## "Hello World"
+### (using the Kotlin Client)
+
+In this part of the tutorial we'll write two programs in Kotlin; a
+producer that sends a single message, and a consumer that receives
+messages and prints them out. We'll gloss over some of the detail in
+the Kotlin API, concentrating on this very simple thing just to get
+started. It's the "Hello World" of messaging.
+
+In the diagram below, "P" is our producer and "C" is our consumer. The
+box in the middle is a queue - a message buffer that RabbitMQ keeps
+on behalf of the consumer.
+
+
+
+> #### The Kotlin client library
+>
+> RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open,
+> general-purpose protocol for messaging. There are a number of clients
+> for RabbitMQ in [many different
+> languages](/client-libraries/devtools). We'll
+> use the [Kourier](https://github.com/NathanFallet/Kourier) client library for Kotlin.
+>
+> Kourier is a modern, coroutine-based AMQP 0-9-1 client for Kotlin. To use it in your project,
+> add the following dependency:
+>
+> **Gradle (Kotlin DSL):**
+> ```kotlin
+> dependencies {
+> implementation("dev.kourier:kourier-amqp:1.0.0")
+> }
+> ```
+>
+> **Gradle (Groovy DSL):**
+> ```groovy
+> dependencies {
+> implementation 'dev.kourier:kourier-amqp:1.0.0'
+> }
+> ```
+>
+> **Maven:**
+> ```xml
+>
+> dev.kourier
+> kourier-amqp
+> 1.0.0
+>
+> ```
+
+Now we have the Kotlin client library set up, we can write some code.
+
+### Sending
+
+
+
+We'll call our message publisher (sender) `send` and our message consumer (receiver)
+`receive`. The publisher will connect to RabbitMQ, send a single message,
+then exit.
+
+We need some imports:
+
+```kotlin
+import dev.kourier.amqp.Properties
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+```
+
+Set up the send function and the queue name:
+
+```kotlin
+val queueName = "hello"
+
+suspend fun send(coroutineScope: CoroutineScope) {
+ // ...
+}
+```
+
+Then we can create a connection to the server:
+
+```kotlin
+suspend fun send(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ // Publishing code will go here...
+
+ channel.close()
+ connection.close()
+}
+```
+
+The connection abstracts the socket connection, and takes care of
+protocol version negotiation and authentication and so on for us. Here
+we connect to a RabbitMQ node on the local machine - hence the
+_localhost_. If we wanted to connect to a node on a different
+machine we'd simply specify its hostname or IP address here.
+
+Next we create a channel, which is where most of the API for getting
+things done resides.
+
+To send, we must declare a queue for us to send to; then we can publish a message
+to the queue:
+
+```kotlin
+channel.queueDeclare(
+ queueName,
+ durable = false,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+)
+val message = "Hello World!"
+channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = queueName,
+ properties = Properties()
+)
+println(" [x] Sent '$message'")
+```
+
+Declaring a queue is idempotent - it will only be created if it doesn't
+exist already. The message content is a byte array, so you can encode
+whatever you like there.
+
+> #### Sending doesn't work!
+>
+> If this is your first time using RabbitMQ and you don't see the "Sent"
+> message then you may be left scratching your head wondering what could
+> be wrong. Maybe the broker was started without enough free disk space
+> (by default it needs at least 50 MB free) and is therefore refusing to
+> accept messages. Check the broker [log file](/docs/logging/) to see if there
+> is a [resource alarm](/docs/alarms) logged and reduce the
+> free disk space threshold if necessary.
+> The [Configuration guide](/docs/configure#config-items)
+> will show you how to set disk_free_limit.
+
+
+### Receiving
+
+That's it for our publisher. Our consumer listens for messages from
+RabbitMQ, so unlike the publisher which publishes a single message, we'll
+keep the consumer running to listen for messages and print them out.
+
+
+
+Setting up is the same as the publisher; we open a connection and a
+channel, and declare the queue from which we're going to consume.
+Note this matches up with the queue that `send` publishes to.
+
+```kotlin
+suspend fun receive(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ queueName,
+ durable = false,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+ println(" [*] Waiting for messages. To exit press CTRL+C")
+
+ // Consuming code will go here...
+
+ channel.close()
+ connection.close()
+}
+```
+
+Note that we declare the queue here, as well. Because we might start
+the consumer before the publisher, we want to make sure the queue exists
+before we try to consume messages from it.
+
+We're about to tell the server to deliver us the messages from the
+queue. Since Kourier is built on Kotlin coroutines, consuming messages
+is as simple as iterating over a channel:
+
+```kotlin
+val consumer = channel.basicConsume(queueName, noAck = true)
+
+for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ println(" [x] Received '$message'")
+}
+```
+
+### Putting it all together
+
+You can wrap both functions in a `main` function with a `runBlocking` block:
+
+```kotlin
+import dev.kourier.amqp.Properties
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+
+val queueName = "hello"
+
+suspend fun send(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ queueName,
+ durable = false,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+ val message = "Hello World!"
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = queueName,
+ properties = Properties()
+ )
+ println(" [x] Sent '$message'")
+
+ channel.close()
+ connection.close()
+}
+
+suspend fun receive(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ queueName,
+ durable = false,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+ println(" [*] Waiting for messages. To exit press CTRL+C")
+
+ val consumer = channel.basicConsume(queueName, noAck = true)
+
+ for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ println(" [x] Received '$message'")
+ }
+
+ channel.close()
+ connection.close()
+}
+
+fun main() = runBlocking {
+ launch { send(this) }
+ launch { receive(this) }
+
+ delay(Long.MAX_VALUE) // Keep the main thread alive
+}
+```
+
+The consumer will print the message it gets from the publisher via
+RabbitMQ. The consumer will keep running, waiting for messages (Use Ctrl-C to stop it).
+
+> #### Listing queues
+>
+> You may wish to see what queues RabbitMQ has and how many
+> messages are in them. You can do it (as a privileged user) using the `rabbitmqctl` tool:
+>
+> ```bash
+> sudo rabbitmqctl list_queues
+> ```
+>
+> On Windows, omit the sudo:
+> ```PowerShell
+> rabbitmqctl.bat list_queues
+> ```
+
+
+Time to move on to [part 2](./tutorial-two-kotlin) and build a simple _work queue_.
diff --git a/tutorials/tutorial-seven-kotlin.md b/tutorials/tutorial-seven-kotlin.md
new file mode 100644
index 000000000..f705154b0
--- /dev/null
+++ b/tutorials/tutorial-seven-kotlin.md
@@ -0,0 +1,379 @@
+---
+title: RabbitMQ tutorial - Publisher Confirms
+---
+
+
+import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
+
+# RabbitMQ tutorial - Publisher Confirms
+
+## Publisher Confirms
+### (using the Kotlin Client)
+
+
+
+[Publisher confirms](/docs/confirms#publisher-confirms) are a RabbitMQ extension
+to implement reliable publishing. When publisher confirms are enabled on a
+channel, messages the client publishes are confirmed asynchronously by the
+broker, meaning they have been taken care of on the server side.
+
+### Overview
+
+In this tutorial we'll use publisher confirms to make sure published
+messages have safely reached the broker. We will cover several
+strategies for using publisher confirms and explain their pros and cons.
+
+### Enabling Publisher Confirms on a Channel
+
+Publisher confirms are a RabbitMQ extension to the AMQP 0.9.1 protocol.
+Publisher confirms are enabled at the channel level. To enable them, use the
+`confirmSelect` method:
+
+```kotlin
+channel.confirmSelect()
+```
+
+This method must be called on every channel that you expect to use publisher
+confirms. Confirms should be enabled just once, not for every message published.
+
+### Strategy #1: Publishing Messages Individually
+
+Let's start with the simplest approach to publishing with confirms,
+that is, publishing a message and waiting synchronously for its confirmation:
+
+```kotlin
+suspend fun publishMessagesIndividually(channel: AMQPChannel, messages: List) {
+ channel.confirmSelect()
+
+ for (message in messages) {
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "my_queue",
+ properties = Properties()
+ )
+
+ // Wait for confirm
+ val confirm = channel.publishConfirmResponses.first()
+
+ when (confirm) {
+ is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
+ println("✓ Message confirmed: $message")
+ }
+ is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
+ println("✗ Message rejected: $message")
+ // Handle rejection (retry, log, etc.)
+ }
+ }
+ }
+}
+```
+
+In the previous example we publish a message as usual and wait for its
+confirmation with the `first()` call. The method returns as
+soon as the message has been confirmed. If the message is not confirmed
+within the timeout or if it is nack-ed (meaning the broker could not take
+care of it for some reason), the method will throw an exception. The handling
+of the exception usually consists in logging an error message and/or retrying
+to send the message.
+
+Different client libraries have different ways to synchronously deal with publisher
+confirms, so make sure to read carefully the documentation of the client you are using.
+
+This technique is very straightforward but also has a major drawback:
+it **significantly slows down publishing**, as the confirmation of a message blocks
+the publishing of all subsequent messages. This approach is not going to
+deliver throughput of more than a few hundreds of published messages per second.
+Nevertheless, this can be good enough for some applications.
+
+> #### Are Publisher Confirms Asynchronous?
+>
+> We mentioned at the beginning that the broker confirms published
+> messages asynchronously but in the first example the code waits
+> synchronously until the message is confirmed. The client actually
+> receives confirms asynchronously and unblocks the call to `first()`
+> accordingly. Think of `first()` as a synchronous helper which
+> relies on asynchronous notifications under the hood.
+
+### Strategy #2: Publishing Messages in Batches
+
+To improve upon our previous example, we can publish a batch of messages
+and wait for this whole batch to be confirmed. The following example uses
+a batch of 100:
+
+```kotlin
+suspend fun publishMessagesInBatch(channel: AMQPChannel, messages: List, batchSize: Int) {
+ channel.confirmSelect()
+
+ messages.chunked(batchSize).forEach { batch ->
+ // Publish entire batch
+ batch.forEach { message ->
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "my_queue",
+ properties = Properties()
+ )
+ }
+
+ // Wait for all confirms for this batch
+ val confirms = channel.publishConfirmResponses.take(batch.size).toList()
+
+ val ackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Ack }
+ val nackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Nack }
+
+ println("Batch complete: $ackCount acks, $nackCount nacks")
+
+ if (nackCount > 0) {
+ // Handle failures (can't identify specific messages easily)
+ println("Warning: Some messages in batch were rejected")
+ }
+ }
+}
+```
+
+Waiting for a batch of messages to be confirmed improves throughput drastically over
+waiting for a confirm for individual message (up to 20-30 times with a remote RabbitMQ node).
+One drawback is that we do not know exactly what went wrong in case of failure,
+so we may have to keep a whole batch in memory to log something meaningful or
+to re-publish the messages. And this solution is still synchronous, so it
+blocks the publishing of messages.
+
+### Strategy #3: Handling Publisher Confirms Asynchronously
+
+The broker confirms published messages asynchronously, one just needs to
+register a callback on the client to be notified of these confirms:
+
+```kotlin
+suspend fun publishMessagesAsync(channel: AMQPChannel, messages: List) {
+ channel.confirmSelect()
+
+ val outstandingConfirms = mutableMapOf()
+ var nextDeliveryTag = 1UL
+
+ // Launch coroutine to handle confirms
+ val confirmJob = launch {
+ channel.publishConfirmResponses.collect { confirm ->
+ when (confirm) {
+ is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
+ if (confirm.multiple) {
+ // Remove all up to and including this tag
+ outstandingConfirms.keys.filter { it <= confirm.deliveryTag }
+ .forEach { outstandingConfirms.remove(it) }
+ } else {
+ outstandingConfirms.remove(confirm.deliveryTag)
+ }
+ }
+ is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
+ val message = outstandingConfirms[confirm.deliveryTag]
+ println("✗ Message nacked: $message")
+ // Handle specific message rejection
+ outstandingConfirms.remove(confirm.deliveryTag)
+ }
+ }
+ }
+ }
+
+ // Publish all messages
+ messages.forEach { message ->
+ outstandingConfirms[nextDeliveryTag] = message
+
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "my_queue",
+ properties = Properties()
+ )
+
+ nextDeliveryTag++
+ }
+
+ // Wait until all confirms are received
+ while (outstandingConfirms.isNotEmpty()) {
+ delay(10)
+ }
+
+ confirmJob.cancel()
+}
+```
+
+In this example we use Kotlin's Flow API to handle confirms asynchronously. We collect
+confirms from the `publishConfirmResponses` flow. The callback will be invoked for each
+confirmed message. We keep track of outstanding confirms with a map. When a confirm arrives,
+we remove the entry from the map. If the confirm indicates that multiple messages have been
+confirmed (the `multiple` field is `true`), we remove all messages up to and including the
+confirmed delivery tag.
+
+The async approach for handling confirms requires tracking of published messages. We use
+a concurrent map to correlate the publish delivery tag with the message content. This is
+necessary for logging meaningful information or to re-publish a message that has been
+nack-ed. The handling of confirms can also be decomposed into a fire-and-forget approach:
+a background task or flow can handle the confirms and update the map accordingly.
+
+### Summary
+
+Making sure published messages made it to the broker can be essential in some applications.
+Publisher confirms are a RabbitMQ feature that helps to meet this requirement. Publisher
+confirms are asynchronous in nature but it is also possible to handle them synchronously.
+There is no definitive way to implement publisher confirms, this usually comes down to
+the constraints in the application and in the overall system. Typical techniques are:
+
+* publish messages individually, wait for the confirmation synchronously: simple, but very
+ limited throughput.
+* publish messages in batch, wait for the confirmation synchronously for a batch: simple,
+ reasonable throughput, but hard to reason about when something goes wrong.
+* asynchronous handling: best performance and use of resources, good control in case of
+ error, but can be involved to implement correctly.
+
+### Putting it all together
+
+The full example code:
+
+```kotlin
+import dev.kourier.amqp.AMQPResponse
+import dev.kourier.amqp.Properties
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.first
+import kotlinx.coroutines.flow.take
+import kotlinx.coroutines.flow.toList
+
+suspend fun publishMessagesIndividually(channel: AMQPChannel, messages: List) {
+ channel.confirmSelect()
+
+ for (message in messages) {
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "my_queue",
+ properties = Properties()
+ )
+
+ val confirm = channel.publishConfirmResponses.first()
+
+ when (confirm) {
+ is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
+ println("✓ Message confirmed: $message")
+ }
+ is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
+ println("✗ Message rejected: $message")
+ }
+ }
+ }
+}
+
+suspend fun publishMessagesInBatch(channel: AMQPChannel, messages: List, batchSize: Int) {
+ channel.confirmSelect()
+
+ messages.chunked(batchSize).forEach { batch ->
+ batch.forEach { message ->
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "my_queue",
+ properties = Properties()
+ )
+ }
+
+ val confirms = channel.publishConfirmResponses.take(batch.size).toList()
+
+ val ackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Ack }
+ val nackCount = confirms.count { it is AMQPResponse.Channel.Basic.PublishConfirm.Nack }
+
+ println("Batch complete: $ackCount acks, $nackCount nacks")
+
+ if (nackCount > 0) {
+ println("Warning: Some messages in batch were rejected")
+ }
+ }
+}
+
+suspend fun publishMessagesAsync(channel: AMQPChannel, messages: List) {
+ channel.confirmSelect()
+
+ val outstandingConfirms = mutableMapOf()
+ var nextDeliveryTag = 1UL
+
+ val confirmJob = launch {
+ channel.publishConfirmResponses.collect { confirm ->
+ when (confirm) {
+ is AMQPResponse.Channel.Basic.PublishConfirm.Ack -> {
+ if (confirm.multiple) {
+ outstandingConfirms.keys.filter { it <= confirm.deliveryTag }
+ .forEach { outstandingConfirms.remove(it) }
+ } else {
+ outstandingConfirms.remove(confirm.deliveryTag)
+ }
+ }
+ is AMQPResponse.Channel.Basic.PublishConfirm.Nack -> {
+ val message = outstandingConfirms[confirm.deliveryTag]
+ println("✗ Message nacked: $message")
+ outstandingConfirms.remove(confirm.deliveryTag)
+ }
+ }
+ }
+ }
+
+ messages.forEach { message ->
+ outstandingConfirms[nextDeliveryTag] = message
+
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "my_queue",
+ properties = Properties()
+ )
+
+ nextDeliveryTag++
+ }
+
+ while (outstandingConfirms.isNotEmpty()) {
+ delay(10)
+ }
+
+ confirmJob.cancel()
+}
+
+fun main() = runBlocking {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(this, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare("my_queue", false, false, true, emptyMap())
+
+ val messages = List(1000) { "Message $it" }
+
+ val startTime = System.currentTimeMillis()
+ publishMessagesAsync(channel, messages)
+ val duration = System.currentTimeMillis() - startTime
+
+ println("Published ${messages.size} messages in ${duration}ms")
+
+ channel.close()
+ connection.close()
+}
+```
+
+This tutorial is now complete. Note that publisher confirms is an advanced feature
+and may not be necessary for all applications. For more information on publisher
+confirms and other reliability features, see the [documentation on reliability](/docs/confirms).
diff --git a/tutorials/tutorial-six-kotlin.md b/tutorials/tutorial-six-kotlin.md
new file mode 100644
index 000000000..4ade9ba95
--- /dev/null
+++ b/tutorials/tutorial-six-kotlin.md
@@ -0,0 +1,379 @@
+---
+title: RabbitMQ tutorial - Remote procedure call (RPC)
+---
+
+
+import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
+import T6DiagramFull from '@site/src/components/Tutorials/T6DiagramFull.md';
+
+# RabbitMQ tutorial - Remote procedure call (RPC)
+
+## Remote procedure call (RPC)
+### (using the Kotlin Client)
+
+
+
+
+In the [second tutorial](./tutorial-two-kotlin) we learned how to
+use _Work Queues_ to distribute time-consuming tasks among multiple
+workers.
+
+But what if we need to run a function on a remote computer and wait for
+the result? Well, that's a different story. This pattern is commonly
+known as _Remote Procedure Call_ or _RPC_.
+
+In this tutorial we're going to use RabbitMQ to build an RPC system: a
+client and a scalable RPC server. As we don't have any time-consuming
+tasks that are worth distributing, we're going to create a dummy RPC
+service that returns Fibonacci numbers.
+
+Client interface
+----------------
+
+To illustrate how an RPC service could be used we're going to
+create a simple client function. It will send an RPC
+request and wait until the answer is received:
+
+```kotlin
+val result = rpcClient(this, 30)
+println("fib(30) = $result")
+```
+
+> #### A note on RPC
+>
+> Although RPC is a pretty common pattern in computing, it's often criticised.
+> The problems arise when a programmer is not aware whether a function call
+> is local or if it's a slow RPC. Confusions like that result in an
+> unpredictable system and adds unnecessary complexity to debugging.
+> Instead of simplifying software, misused RPC can result in unmaintainable
+> spaghetti code.
+>
+> Bearing that in mind, consider the following advice:
+>
+> * Make sure it's obvious which function call is local and which is remote.
+> * Document your system. Make the dependencies between components clear.
+> * Handle error cases. How should the client react when the RPC server is
+> down for a long time?
+>
+> When in doubt avoid RPC. If you can, you should use an asynchronous
+> pipeline - instead of RPC-like blocking, results are asynchronously
+> pushed to a next computation stage.
+
+
+Callback queue
+--------------
+
+In general doing RPC over RabbitMQ is easy. A client sends a request
+message and a server replies with a response message. In order to
+receive a response we need to send a 'callback' queue address with the
+request. We can use the default queue. Let's try it:
+
+```kotlin
+val callbackQueueDeclared = channel.queueDeclare(
+ name = "",
+ durable = false,
+ exclusive = true,
+ autoDelete = true,
+ arguments = emptyMap()
+)
+val callbackQueueName = callbackQueueDeclared.queueName
+
+val requestProps = properties {
+ replyTo = callbackQueueName
+}
+
+channel.basicPublish(
+ "30".toByteArray(),
+ exchange = "",
+ routingKey = "rpc_queue",
+ properties = requestProps
+)
+
+// ... then code to read a response message from the callback queue ...
+```
+
+> #### Message properties
+>
+> The AMQP 0-9-1 protocol predefines a set of 14 properties that go with
+> a message. Most of the properties are rarely used, with the exception of
+> the following:
+>
+> * `deliveryMode`: Marks a message as persistent (with a value of 2)
+> or transient (any other value).
+> * `contentType`: Used to describe the mime-type of the encoding.
+> For example for the often used JSON encoding it is a good practice
+> to set this property to: `application/json`.
+> * `replyTo`: Commonly used to name a callback queue.
+> * `correlationId`: Useful to correlate RPC responses with requests.
+
+Correlation Id
+--------------
+
+In the method presented above we suggest creating a callback queue for
+every RPC request. That's pretty inefficient, but fortunately there is
+a better way - let's create a single callback queue per client.
+
+That raises a new issue, having received a response in that queue it's
+not clear to which request the response belongs. That's when the
+`correlationId` property is used. We're going to set it to a unique
+value for every request. Later, when we receive a message in the
+callback queue we'll look at this property, and based on that we'll be
+able to match a response with a request. If we see an unknown
+`correlationId` value, we may safely discard the message - it doesn't
+belong to our requests.
+
+You may ask, why should we ignore unknown messages in the callback
+queue, rather than failing with an error? It's due to a possibility of
+a race condition on the server side. Although unlikely, it is possible
+that the RPC server will die just after sending us the answer, but
+before sending an acknowledgment message for the request. If that
+happens, the restarted RPC server will process the request again.
+That's why on the client we must handle the duplicate responses
+gracefully, and the RPC should ideally be idempotent.
+
+Summary
+-------
+
+
+
+Our RPC will work like this:
+
+ * For an RPC request, the Client sends a message with two properties:
+ `replyTo`, which is set to a callback queue created just for the request,
+ and `correlationId`, which is set to a unique value for every request.
+ * The request is sent to an `rpc_queue` queue.
+ * The RPC worker (aka: server) is waiting for requests on that queue.
+ When a request appears, it does the job and sends a message with the
+ result back to the Client, using the queue from the `replyTo` field.
+ * The client waits for data on the callback queue. When a message
+ appears, it checks the `correlationId` property. If it matches
+ the value from the request it returns the response to the application.
+
+Putting it all together
+-----------------------
+
+The Fibonacci function:
+
+```kotlin
+private fun fib(n: Int): Int {
+ return when {
+ n == 0 -> 0
+ n == 1 -> 1
+ else -> fib(n - 1) + fib(n - 2)
+ }
+}
+```
+
+We declare our fibonacci function. It assumes only valid positive integer input.
+(Don't expect this one to work for big numbers,
+and it's probably the slowest recursive implementation possible).
+
+
+The code for our RPC server:
+
+```kotlin
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import dev.kourier.amqp.properties
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun rpcServer(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ "rpc_queue",
+ durable = false,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+
+ channel.basicQos(count = 1u, global = false)
+
+ println(" [x] Awaiting RPC requests")
+
+ val consumer = channel.basicConsume("rpc_queue", noAck = false)
+
+ for (delivery in consumer) {
+ val props = delivery.message.properties
+ val correlationId = props.correlationId
+ val replyTo = props.replyTo
+
+ val requestMessage = delivery.message.body.decodeToString()
+ val n = requestMessage.toIntOrNull() ?: 0
+
+ println(" [.] fib($n)")
+ val response = fib(n)
+
+ val replyProps = properties {
+ this.correlationId = correlationId
+ }
+
+ if (replyTo != null) {
+ channel.basicPublish(
+ response.toString().toByteArray(),
+ exchange = "",
+ routingKey = replyTo,
+ properties = replyProps
+ )
+ }
+
+ channel.basicAck(delivery.message, multiple = false)
+ }
+
+ channel.close()
+ connection.close()
+}
+
+private fun fib(n: Int): Int {
+ return when {
+ n == 0 -> 0
+ n == 1 -> 1
+ else -> fib(n - 1) + fib(n - 2)
+ }
+}
+```
+
+The server code is rather straightforward:
+
+ * As usual we start by establishing the connection and declaring the queue.
+ * We might want to run more than one server process. In order to spread
+ the load equally over multiple servers we need to set the
+ `basicQos` setting.
+ * We use `basicConsume` to access the queue, where we provide the callback
+ that will do the work and send the response back.
+
+
+The code for our RPC client:
+
+```kotlin
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import dev.kourier.amqp.properties
+import kotlinx.coroutines.CoroutineScope
+import java.util.UUID
+
+suspend fun rpcClient(coroutineScope: CoroutineScope, n: Int): Int {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ val callbackQueueDeclared = channel.queueDeclare(
+ name = "",
+ durable = false,
+ exclusive = true,
+ autoDelete = true,
+ arguments = emptyMap()
+ )
+ val callbackQueueName = callbackQueueDeclared.queueName
+
+ val correlationId = UUID.randomUUID().toString()
+
+ val consumer = channel.basicConsume(callbackQueueName, noAck = true)
+ var result = 0
+
+ val requestProps = properties {
+ this.correlationId = correlationId
+ this.replyTo = callbackQueueName
+ }
+
+ channel.basicPublish(
+ n.toString().toByteArray(),
+ exchange = "",
+ routingKey = "rpc_queue",
+ properties = requestProps
+ )
+ println(" [x] Requesting fib($n)")
+
+ for (delivery in consumer) {
+ val responseCorrelationId = delivery.message.properties.correlationId
+
+ if (responseCorrelationId == correlationId) {
+ result = delivery.message.body.decodeToString().toInt()
+ println(" [.] Got $result")
+ break
+ }
+ }
+
+ channel.close()
+ connection.close()
+
+ return result
+}
+```
+
+We establish a connection and channel. We declare an exclusive callback queue
+for replies. We subscribe to the callback queue, so that we can receive RPC
+responses. We generate a unique `correlationId` number and save it. The loop
+is waiting for an appropriate response and whenever we get a response we check
+if the `correlationId` is the one we're looking for. If so, we save the response.
+
+Making the RPC request:
+
+```kotlin
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ // Start RPC server
+ launch {
+ rpcServer(this)
+ }
+
+ delay(1000)
+
+ // Make RPC call
+ val result = rpcClient(this, 30)
+ println("fib(30) = $result")
+}
+```
+
+The presented design is not the only possible implementation of an RPC
+service, but it has some important advantages:
+
+ * If the RPC server is too slow, you can scale up by just running
+ another one. Try running a second RPC server.
+ * On the client side, the RPC requires sending and receiving only one
+ message. No synchronous calls like `queueDeclare` are required. As a
+ result the RPC client needs only one network round trip for a single
+ RPC request.
+
+Our code is still pretty simplistic and doesn't try to solve more
+complex (but important) problems, like:
+
+ * How should the client react if there are no servers running?
+ * Should a client have some kind of timeout for the RPC?
+ * If the server malfunctions and raises an exception, should it be
+ forwarded to the client?
+ * Protecting against invalid incoming messages (eg checking bounds,
+ type) before processing.
+
+>If you want to experiment, you may find the [management UI](/docs/management) useful for viewing the queues.
+
+Move on to [tutorial 7](./tutorial-seven-kotlin) to learn about publisher confirms.
diff --git a/tutorials/tutorial-three-kotlin.md b/tutorials/tutorial-three-kotlin.md
new file mode 100644
index 000000000..9224e6126
--- /dev/null
+++ b/tutorials/tutorial-three-kotlin.md
@@ -0,0 +1,366 @@
+---
+title: RabbitMQ tutorial - Publish/Subscribe
+---
+
+
+import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
+import T3DiagramToC from '@site/src/components/Tutorials/T3DiagramToC.md';
+import T3DiagramBinding from '@site/src/components/Tutorials/T3DiagramBinding.md';
+import T3DiagramFull from '@site/src/components/Tutorials/T3DiagramFull.md';
+
+# RabbitMQ tutorial - Publish/Subscribe
+
+## Publish/Subscribe
+### (using the Kotlin Client)
+
+
+
+
+
+In the [previous tutorial](./tutorial-two-kotlin) we created a work
+queue. The assumption behind a work queue is that each task is
+delivered to exactly one worker. In this part we'll do something
+completely different -- we'll deliver a message to multiple
+consumers. This pattern is known as "publish/subscribe".
+
+To illustrate the pattern, we're going to build a simple logging
+system. It will consist of two programs -- the first will emit log
+messages and the second will receive and print them.
+
+In our logging system every running copy of the receiver program will
+get the messages. That way we can run one receiver and direct the logs
+to disk; and at the same time we can run another receiver and see the
+logs on the screen.
+
+Essentially, published log messages are going to be broadcast to all
+the receivers.
+
+Exchanges
+---------
+
+In previous parts of the tutorial we sent and received messages to and
+from a queue. Now it's time to introduce the full messaging model in
+Rabbit.
+
+Let's quickly go over what we covered in the previous tutorials:
+
+* A _producer_ is a user application that sends messages.
+* A _queue_ is a buffer that stores messages.
+* A _consumer_ is a user application that receives messages.
+
+The core idea in the messaging model in RabbitMQ is that the producer
+never sends any messages directly to a queue. Actually, quite often
+the producer doesn't even know if a message will be delivered to any
+queue at all.
+
+Instead, the producer can only send messages to an _exchange_. An
+exchange is a very simple thing. On one side it receives messages from
+producers and the other side it pushes them to queues. The exchange
+must know exactly what to do with a message it receives. Should it be
+appended to a particular queue? Should it be appended to many queues?
+Or should it get discarded. The rules for that are defined by the
+_exchange type_.
+
+
+
+There are a few exchange types available: `direct`, `topic`, `headers`
+and `fanout`. We'll focus on the last one -- the fanout. Let's create
+an exchange of that type, and call it `logs`:
+
+```kotlin
+channel.exchangeDeclare(
+ "logs",
+ BuiltinExchangeType.FANOUT,
+ durable = false,
+ autoDelete = false,
+ internal = false,
+ arguments = emptyMap()
+)
+```
+
+The fanout exchange is very simple. As you can probably guess from the
+name, it just broadcasts all the messages it receives to all the
+queues it knows. And that's exactly what we need for our logger.
+
+
+> #### Listing exchanges
+>
+> To list the exchanges on the server you can run the ever useful `rabbitmqctl`:
+>
+> ```bash
+> sudo rabbitmqctl list_exchanges
+> ```
+>
+> In this list there will be some `amq.*` exchanges and the default (unnamed)
+> exchange. These are created by default, but it is unlikely you'll need to
+> use them at the moment.
+
+
+> #### The default exchange
+>
+> In previous parts of the tutorial we knew nothing about exchanges,
+> but still were able to send messages to queues. That was possible
+> because we were using a default exchange, which we identify by the empty string (`""`).
+>
+> Recall how we published a message before:
+>
+> ```kotlin
+> channel.basicPublish(
+> message.toByteArray(),
+> exchange = "",
+> routingKey = "hello",
+> properties = Properties()
+> )
+> ```
+>
+> The `exchange` parameter is the name of the exchange. The empty string
+> denotes the default or _nameless_ exchange: messages are routed to
+> the queue with the name specified by `routingKey`, if it exists.
+
+Now, we can publish to our named exchange instead:
+
+```kotlin
+channel.basicPublish(
+ message.toByteArray(),
+ exchange = "logs",
+ routingKey = "",
+ properties = Properties()
+)
+```
+
+Temporary queues
+----------------
+
+As you may remember previously we were using queues that had
+specific names (remember `hello` and `task_queue`?). Being able to name
+a queue was crucial for us -- we needed to point the workers to the
+same queue. Giving a queue a name is important when you
+want to share the queue between producers and consumers.
+
+But that's not the case for our logger. We want to hear about all
+log messages, not just a subset of them. We're
+also interested only in currently flowing messages not in the old
+ones. To solve that we need two things.
+
+Firstly, whenever we connect to Rabbit we need a fresh, empty queue.
+To do this we could create a queue with a random name, or,
+even better - let the server choose a random queue name for us.
+
+Secondly, once we disconnect the consumer the queue should be
+automatically deleted.
+
+In the Kotlin client, when we supply an empty string as the queue name, we
+create a non-durable queue with a generated name:
+
+```kotlin
+val queueDeclared = channel.queueDeclare(
+ name = "",
+ durable = false,
+ exclusive = true,
+ autoDelete = true,
+ arguments = emptyMap()
+)
+val queueName = queueDeclared.queueName
+```
+
+At this point `queueName` contains a random queue name. For example
+it may look like `amq.gen-JzTY20BRgKO-HjmUJj0wLg`.
+
+When the connection that declared it closes, the queue will be deleted
+because it is declared as exclusive.
+
+Bindings
+--------
+
+
+
+We've already created a fanout exchange and a queue. Now we need to
+tell the exchange to send messages to our queue. That relationship
+between exchange and a queue is called a _binding_.
+
+```kotlin
+channel.queueBind(
+ queue = queueName,
+ exchange = "logs",
+ routingKey = ""
+)
+```
+
+From now on the `logs` exchange will append messages to our queue.
+
+> #### Listing bindings
+>
+> You can list existing bindings using, you guessed it,
+> ```bash
+> rabbitmqctl list_bindings
+> ```
+
+Putting it all together
+-----------------------
+
+
+
+The producer program, which emits log messages, doesn't look much
+different from the previous tutorial. The most important change is that
+we now want to publish messages to our `logs` exchange instead of the
+nameless one. We need to supply a `routingKey` when sending, but its
+value is ignored for `fanout` exchanges. Here goes the code for
+`emitLog` function:
+
+```kotlin
+import dev.kourier.amqp.BuiltinExchangeType
+import dev.kourier.amqp.Properties
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun emitLog(coroutineScope: CoroutineScope, message: String) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.exchangeDeclare(
+ "logs",
+ BuiltinExchangeType.FANOUT,
+ durable = false,
+ autoDelete = false,
+ internal = false,
+ arguments = emptyMap()
+ )
+
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "logs",
+ routingKey = "",
+ properties = Properties()
+ )
+ println(" [x] Sent '$message'")
+
+ channel.close()
+ connection.close()
+}
+```
+
+As you can see, after establishing the connection we declared the
+exchange. This step is necessary as publishing to a non-existing
+exchange is forbidden.
+
+The messages will be lost if no queue is bound to the exchange yet,
+but that's okay for us; if no consumer is listening yet we can safely discard the message.
+
+The code for `receiveLogs`:
+
+```kotlin
+import dev.kourier.amqp.BuiltinExchangeType
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun receiveLogs(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.exchangeDeclare(
+ "logs",
+ BuiltinExchangeType.FANOUT,
+ durable = false,
+ autoDelete = false,
+ internal = false,
+ arguments = emptyMap()
+ )
+
+ val queueDeclared = channel.queueDeclare(
+ name = "",
+ durable = false,
+ exclusive = true,
+ autoDelete = true,
+ arguments = emptyMap()
+ )
+ val queueName = queueDeclared.queueName
+
+ channel.queueBind(
+ queue = queueName,
+ exchange = "logs",
+ routingKey = ""
+ )
+
+ println(" [*] Waiting for logs. To exit press CTRL+C")
+
+ val consumer = channel.basicConsume(queueName, noAck = true)
+
+ for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ println(" [x] $message")
+ }
+
+ channel.close()
+ connection.close()
+}
+```
+
+We're done. If you want to save logs to a file, just open a console and run:
+
+```kotlin
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ launch { receiveLogs(this) }
+ // In practice, you would redirect output to a file
+}
+```
+
+If you wish to see the logs on your screen, spawn a new terminal and run the receiver again.
+
+And of course, to emit logs:
+
+```kotlin
+import kotlinx.coroutines.runBlocking
+
+fun main() = runBlocking {
+ emitLog(this, "info: Application started")
+}
+```
+
+Using `rabbitmqctl list_bindings` you can verify that the code actually
+creates bindings and queues as we want. With two `receiveLogs` programs
+running you should see something like:
+
+```bash
+sudo rabbitmqctl list_bindings
+# => Listing bindings ...
+# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
+# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
+# => ...done.
+```
+
+The interpretation of the result is straightforward: data from
+exchange `logs` goes to two queues with server-assigned names. And
+that's exactly what we intended.
+
+To find out how to listen for a subset of messages, let's move on to
+[tutorial 4](./tutorial-four-kotlin)
diff --git a/tutorials/tutorial-two-kotlin.md b/tutorials/tutorial-two-kotlin.md
new file mode 100644
index 000000000..de01cb030
--- /dev/null
+++ b/tutorials/tutorial-two-kotlin.md
@@ -0,0 +1,456 @@
+---
+title: RabbitMQ tutorial - Work Queues
+---
+
+
+import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md';
+import T2DiagramToC from '@site/src/components/Tutorials/T2DiagramToC.md';
+import T2DiagramPrefetch from '@site/src/components/Tutorials/T2DiagramPrefetch.md';
+
+# RabbitMQ tutorial - Work Queues
+
+## Work Queues
+### (using the Kotlin Client)
+
+
+
+
+
+In the [first tutorial](./tutorial-one-kotlin) we
+wrote programs to send and receive messages from a named queue. In this
+one we'll create a _Work Queue_ that will be used to distribute
+time-consuming tasks among multiple workers.
+
+The main idea behind Work Queues (aka: _Task Queues_) is to avoid
+doing a resource-intensive task immediately and having to wait for
+it to complete. Instead we schedule the task to be done later. We encapsulate a
+_task_ as a message and send it to a queue. A worker process running
+in the background will pop the tasks and eventually execute the
+job. When you run many workers the tasks will be shared between them.
+
+This concept is especially useful in web applications where it's
+impossible to handle a complex task during a short HTTP request
+window.
+
+Preparation
+-----------
+
+In the previous part of this tutorial we sent a message containing
+"Hello World!". Now we'll be sending strings that stand for complex
+tasks. We don't have a real-world task, like images to be resized or
+pdf files to be rendered, so let's fake it by just pretending we're
+busy - by using the `delay()` function. We'll take the number of dots
+in the string as its complexity; every dot will account for one second
+of "work". For example, a fake task described by `Hello...`
+will take three seconds.
+
+We will slightly modify the `send` function from our previous example,
+to allow messages to be sent as tasks. This
+program will schedule tasks to our work queue, so let's name it
+`newTask`:
+
+```kotlin
+suspend fun newTask(coroutineScope: CoroutineScope, message: String) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ "task_queue",
+ durable = true,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+
+ val properties = properties {
+ deliveryMode = 2u // Persistent message
+ }
+
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "task_queue",
+ properties = properties
+ )
+ println(" [x] Sent '$message'")
+
+ channel.close()
+ connection.close()
+}
+```
+
+Our old `receive` function also requires some changes: it needs to
+fake a second of work for every dot in the message body. It will handle
+delivered messages and perform the task, so let's call it `worker`:
+
+```kotlin
+suspend fun worker(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ "task_queue",
+ durable = true,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+ println(" [*] Waiting for messages. To exit press CTRL+C")
+
+ val consumer = channel.basicConsume("task_queue", noAck = true)
+
+ for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ println(" [x] Received '$message'")
+
+ try {
+ doWork(message)
+ } finally {
+ println(" [x] Done")
+ }
+ }
+
+ channel.close()
+ connection.close()
+}
+```
+
+Our fake task to simulate execution time:
+
+```kotlin
+private suspend fun doWork(task: String) {
+ for (ch in task) {
+ if (ch == '.') {
+ delay(1000) // Sleep for 1 second per dot
+ }
+ }
+}
+```
+
+Round-robin dispatching
+------------------------
+
+One of the advantages of using a Task Queue is the ability to easily
+parallelise work. If we are building up a backlog of work, we can just
+add more workers and that way, scale easily.
+
+By default, RabbitMQ will send each message to the next consumer,
+in sequence. On average every consumer will get the same number of
+messages. This way of distributing messages is called round-robin.
+
+Message acknowledgment
+----------------------
+
+Doing a task can take a few seconds, you may wonder what happens if
+a consumer starts a long task and it terminates before it completes.
+With our current code, once RabbitMQ delivers a message to the consumer, it
+immediately marks it for deletion. In this case, if you terminate a worker,
+the message it was just processing is lost. The messages that were dispatched
+to this particular worker but were not yet handled are also lost.
+
+But we don't want to lose any tasks. If a worker dies, we'd like the
+task to be delivered to another worker.
+
+In order to make sure a message is never lost, RabbitMQ supports
+[message _acknowledgments_](/docs/confirms). An ack(nowledgement) is sent back by the
+consumer to tell RabbitMQ that a particular message has been received,
+processed and that RabbitMQ is free to delete it.
+
+If a consumer dies (its channel is closed, connection is closed, or
+TCP connection is lost) without sending an ack, RabbitMQ will
+understand that a message wasn't processed fully and will re-queue it.
+If there are other consumers online at the same time, it will then quickly
+redeliver it to another consumer. That way you can be sure that no
+message is lost, even if the workers occasionally die.
+
+A timeout (30 minutes by default) is enforced on consumer delivery acknowledgement.
+This helps detect buggy (stuck) consumers that never acknowledge deliveries.
+You can increase this timeout as described in
+[Delivery Acknowledgement Timeout](/docs/consumers#acknowledgement-timeout).
+
+[Manual message acknowledgments](/docs/confirms) are turned off by default in the previous examples.
+It's time to turn them on using the `noAck = false` flag and send a proper acknowledgment
+from the worker, once we're done with a task.
+
+```kotlin
+channel.basicQos(count = 1u, global = false)
+
+val consumer = channel.basicConsume("task_queue", noAck = false)
+
+for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ println(" [x] Received '$message'")
+
+ try {
+ doWork(message)
+ println(" [x] Done")
+ } finally {
+ channel.basicAck(delivery.message, multiple = false)
+ }
+}
+```
+
+Using this code, you can ensure that even if you terminate a worker using
+CTRL+C while it was processing a message, nothing is lost. Soon
+after the worker terminates, all unacknowledged messages are redelivered.
+
+Acknowledgement must be sent on the same channel that received the
+delivery. Attempts to acknowledge using a different channel will result
+in a channel-level protocol exception. See the [doc guide on confirmations](/docs/confirms)
+to learn more.
+
+> #### Forgotten acknowledgment
+>
+> It's a common mistake to miss the `basicAck`. It's an easy error,
+> but the consequences are serious. Messages will be redelivered
+> when your client quits (which may look like random redelivery), but
+> RabbitMQ will eat more and more memory as it won't be able to release
+> any unacked messages.
+>
+> In order to debug this kind of mistake you can use `rabbitmqctl`
+> to print the `messages_unacknowledged` field:
+>
+> ```bash
+> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
+> ```
+>
+> On Windows, drop the sudo:
+> ```bash
+> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
+> ```
+
+Message durability
+------------------
+
+We have learned how to make sure that even if the consumer dies, the
+task isn't lost. But our tasks will still be lost if RabbitMQ server stops.
+
+When RabbitMQ quits or crashes it will forget the queues and messages
+unless you tell it not to. Two things are required to make sure that
+messages aren't lost: we need to mark both the queue and messages as
+durable.
+
+First, we need to make sure that the queue will survive a RabbitMQ node restart.
+In order to do so, we need to declare it as _durable_:
+
+```kotlin
+channel.queueDeclare(
+ "task_queue",
+ durable = true,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+)
+```
+
+Although this command is correct by itself, it won't work in our present
+setup. That's because we've already defined a queue called `hello`
+which is not durable. RabbitMQ doesn't allow you to redefine an existing queue
+with different parameters and will return an error to any program
+that tries to do that. But there is a quick workaround - let's declare
+a queue with different name, for example `task_queue`:
+
+```kotlin
+channel.queueDeclare(
+ "task_queue",
+ durable = true,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+)
+```
+
+This `durable` option change needs to be applied to both the producer
+and consumer code.
+
+At this point we're sure that the `task_queue` queue won't be lost
+even if RabbitMQ restarts. Now we need to mark our messages as persistent
+- by setting `deliveryMode` property to `2u`.
+
+```kotlin
+val properties = properties {
+ deliveryMode = 2u // Persistent message
+}
+
+channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "task_queue",
+ properties = properties
+)
+```
+
+> #### Note on message persistence
+>
+> Marking messages as persistent doesn't fully guarantee that a message
+> won't be lost. Although it tells RabbitMQ to save the message to disk,
+> there is still a short time window when RabbitMQ has accepted a message and
+> hasn't saved it yet. Also, RabbitMQ doesn't do `fsync(2)` for every
+> message -- it may be just saved to cache and not really written to the
+> disk. The persistence guarantees aren't strong, but it's more than enough
+> for our simple task queue. If you need a stronger guarantee then you can use
+> [publisher confirms](/docs/confirms).
+
+Fair dispatch
+-------------
+
+
+
+You might have noticed that the dispatching still doesn't work exactly
+as we want. For example in a situation with two workers, when all
+odd messages are heavy and even messages are light, one worker will be
+constantly busy and the other one will do hardly any work. Well,
+RabbitMQ doesn't know anything about that and will still dispatch
+messages evenly.
+
+This happens because RabbitMQ just dispatches a message when the message
+enters the queue. It doesn't look at the number of unacknowledged
+messages for a consumer. It just blindly dispatches every n-th message
+to the n-th consumer.
+
+In order to defeat that we can use the `basicQos` method with the
+`count = 1u` setting. This tells RabbitMQ not to give more than
+one message to a worker at a time. Or, in other words, don't dispatch
+a new message to a worker until it has processed and acknowledged the
+previous one. Instead, it will dispatch it to the next worker that is not still busy.
+
+```kotlin
+channel.basicQos(count = 1u, global = false)
+```
+
+> #### Note about queue size
+>
+> If all the workers are busy, your queue can fill up. You will want to keep an
+> eye on that, and maybe add more workers, or use [message TTL](/docs/ttl).
+
+Putting it all together
+-----------------------
+
+Final code of our `newTask` function:
+
+```kotlin
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import dev.kourier.amqp.properties
+import kotlinx.coroutines.CoroutineScope
+
+suspend fun newTask(coroutineScope: CoroutineScope, message: String) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ "task_queue",
+ durable = true,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+
+ val properties = properties {
+ deliveryMode = 2u // Persistent message
+ }
+
+ channel.basicPublish(
+ message.toByteArray(),
+ exchange = "",
+ routingKey = "task_queue",
+ properties = properties
+ )
+ println(" [x] Sent '$message'")
+
+ channel.close()
+ connection.close()
+}
+```
+
+And our `worker`:
+
+```kotlin
+import dev.kourier.amqp.connection.amqpConfig
+import dev.kourier.amqp.connection.createAMQPConnection
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.delay
+
+suspend fun worker(coroutineScope: CoroutineScope) {
+ val config = amqpConfig {
+ server {
+ host = "localhost"
+ }
+ }
+ val connection = createAMQPConnection(coroutineScope, config)
+ val channel = connection.openChannel()
+
+ channel.queueDeclare(
+ "task_queue",
+ durable = true,
+ exclusive = false,
+ autoDelete = false,
+ arguments = emptyMap()
+ )
+ println(" [*] Waiting for messages. To exit press CTRL+C")
+
+ channel.basicQos(count = 1u, global = false)
+
+ val consumer = channel.basicConsume("task_queue", noAck = false)
+
+ for (delivery in consumer) {
+ val message = delivery.message.body.decodeToString()
+ println(" [x] Received '$message'")
+
+ try {
+ doWork(message)
+ println(" [x] Done")
+ } finally {
+ channel.basicAck(delivery.message, multiple = false)
+ }
+ }
+
+ channel.close()
+ connection.close()
+}
+
+private suspend fun doWork(task: String) {
+ for (ch in task) {
+ if (ch == '.') {
+ delay(1000) // Sleep for 1 second per dot
+ }
+ }
+}
+```
+
+Using message acknowledgments and `basicQos` you can set up a
+work queue. The durability options let the tasks survive even if
+RabbitMQ is restarted.
+
+Now we can move on to [tutorial 3](./tutorial-three-kotlin) and learn how
+to deliver the same message to many consumers.