|
| 1 | +--- |
| 2 | +title: RabbitMQ tutorial - Topics |
| 3 | +--- |
| 4 | +<!-- |
| 5 | +Copyright (c) 2005-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. |
| 6 | +
|
| 7 | +All rights reserved. This program and the accompanying materials |
| 8 | +are made available under the terms of the under the Apache License, |
| 9 | +Version 2.0 (the "License"); you may not use this file except in compliance |
| 10 | +with the License. You may obtain a copy of the License at |
| 11 | +
|
| 12 | +https://www.apache.org/licenses/LICENSE-2.0 |
| 13 | +
|
| 14 | +Unless required by applicable law or agreed to in writing, software |
| 15 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 16 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 17 | +See the License for the specific language governing permissions and |
| 18 | +limitations under the License. |
| 19 | +--> |
| 20 | + |
| 21 | +import TutorialsHelp from '@site/src/components/Tutorials/TutorialsHelp.md'; |
| 22 | +import T5DiagramToC from '@site/src/components/Tutorials/T5DiagramToC.md'; |
| 23 | +import T5DiagramTopicX from '@site/src/components/Tutorials/T5DiagramTopicX.md'; |
| 24 | + |
| 25 | +# RabbitMQ tutorial - Topics |
| 26 | + |
| 27 | +## Topics |
| 28 | +### (using the Kotlin Client) |
| 29 | + |
| 30 | +<TutorialsHelp/> |
| 31 | + |
| 32 | +<T5DiagramToC/> |
| 33 | + |
| 34 | +In the [previous tutorial](./tutorial-four-kotlin) we improved our |
| 35 | +logging system. Instead of using a `fanout` exchange only capable of |
| 36 | +dummy broadcasting, we used a `direct` one, and gained a possibility |
| 37 | +of selectively receiving the logs. |
| 38 | + |
| 39 | +Although using the `direct` exchange improved our system, it still has |
| 40 | +limitations - it can't do routing based on multiple criteria. |
| 41 | + |
| 42 | +In our logging system we might want to subscribe to not only logs |
| 43 | +based on severity, but also based on the source which emitted the log. |
| 44 | +You might know this concept from the |
| 45 | +[`syslog`](http://en.wikipedia.org/wiki/Syslog) unix tool, which |
| 46 | +routes logs based on both severity (info/warn/crit...) and facility |
| 47 | +(auth/cron/kern...). |
| 48 | + |
| 49 | +That would give us a lot of flexibility - we may want to listen to just |
| 50 | +critical errors coming from 'cron' but also all logs from 'kern'. |
| 51 | + |
| 52 | +To implement that in our logging system we need to learn about a more |
| 53 | +complex `topic` exchange. |
| 54 | + |
| 55 | + |
| 56 | +Topic exchange |
| 57 | +-------------- |
| 58 | + |
| 59 | +Messages sent to a `topic` exchange can't have an arbitrary |
| 60 | +`routing_key` - it must be a list of words, delimited by dots. The |
| 61 | +words can be anything, but usually they specify some features |
| 62 | +connected to the message. A few valid routing key examples: |
| 63 | +"`stock.usd.nyse`", "`nyse.vmw`", "`quick.orange.rabbit`". There can be as |
| 64 | +many words in the routing key as you like, up to the limit of 255 |
| 65 | +bytes. |
| 66 | + |
| 67 | +The binding key must also be in the same form. The logic behind the |
| 68 | +`topic` exchange is similar to a `direct` one - a message sent with a |
| 69 | +particular routing key will be delivered to all the queues that are |
| 70 | +bound with a matching binding key. However there are two important |
| 71 | +special cases for binding keys: |
| 72 | + |
| 73 | + * `*` (star) can substitute for exactly one word. |
| 74 | + * `#` (hash) can substitute for zero or more words. |
| 75 | + |
| 76 | +It's easiest to explain this in an example: |
| 77 | + |
| 78 | +<T5DiagramTopicX/> |
| 79 | + |
| 80 | +In this example, we're going to send messages which all describe |
| 81 | +animals. The messages will be sent with a routing key that consists of |
| 82 | +three words (two dots). The first word in the routing key will |
| 83 | +describe speed, second a colour and third a species: |
| 84 | +"`<speed>.<colour>.<species>`". |
| 85 | + |
| 86 | +We created three bindings: Q1 is bound with binding key "`*.orange.*`" |
| 87 | +and Q2 with "`*.*.rabbit`" and "`lazy.#`". |
| 88 | + |
| 89 | +These bindings can be summarised as: |
| 90 | + |
| 91 | + * Q1 is interested in all the orange animals. |
| 92 | + * Q2 wants to hear everything about rabbits, and everything about lazy |
| 93 | + animals. |
| 94 | + |
| 95 | +A message with a routing key set to "`quick.orange.rabbit`" |
| 96 | +will be delivered to both queues. Message "`lazy.orange.elephant`" |
| 97 | +also will go to both of them. On the other hand |
| 98 | +"`quick.orange.fox`" will only go to the first queue, and |
| 99 | +"`lazy.brown.fox`" only to the second. "`lazy.pink.rabbit`" will |
| 100 | +be delivered to the second queue only once, even though it matches two bindings. |
| 101 | +"`quick.brown.fox`" doesn't match any binding so it will be discarded. |
| 102 | + |
| 103 | +What happens if we break our contract and send a message with one or |
| 104 | +four words, like "`orange`" or "`quick.orange.male.rabbit`"? Well, |
| 105 | +these messages won't match any bindings and will be lost. |
| 106 | + |
| 107 | +On the other hand "`lazy.orange.male.rabbit`", even though it has four |
| 108 | +words, will match the last binding and will be delivered to the second |
| 109 | +queue. |
| 110 | + |
| 111 | +> #### Topic exchange |
| 112 | +> |
| 113 | +> Topic exchange is powerful and can behave like other exchanges. |
| 114 | +> |
| 115 | +> When a queue is bound with "`#`" (hash) binding key - it will receive |
| 116 | +> all the messages, regardless of the routing key - like in `fanout` exchange. |
| 117 | +> |
| 118 | +> When special characters "`*`" (star) and "`#`" (hash) aren't used in bindings, |
| 119 | +> the topic exchange will behave just like a `direct` one. |
| 120 | +
|
| 121 | +Putting it all together |
| 122 | +----------------------- |
| 123 | + |
| 124 | +We're going to use a `topic` exchange in our logging system. We'll |
| 125 | +start off with a working assumption that the routing keys of logs will |
| 126 | +have two words: "`<facility>.<severity>`". |
| 127 | + |
| 128 | +The code is almost the same as in the |
| 129 | +[previous tutorial](./tutorial-four-kotlin). |
| 130 | + |
| 131 | +The code for `emitLogTopic`: |
| 132 | + |
| 133 | +```kotlin |
| 134 | +import dev.kourier.amqp.BuiltinExchangeType |
| 135 | +import dev.kourier.amqp.Properties |
| 136 | +import dev.kourier.amqp.connection.amqpConfig |
| 137 | +import dev.kourier.amqp.connection.createAMQPConnection |
| 138 | +import kotlinx.coroutines.CoroutineScope |
| 139 | + |
| 140 | +suspend fun emitLogTopic(coroutineScope: CoroutineScope, routingKey: String, message: String) { |
| 141 | + val config = amqpConfig { |
| 142 | + server { |
| 143 | + host = "localhost" |
| 144 | + } |
| 145 | + } |
| 146 | + val connection = createAMQPConnection(coroutineScope, config) |
| 147 | + val channel = connection.openChannel() |
| 148 | + |
| 149 | + channel.exchangeDeclare( |
| 150 | + "topic_logs", |
| 151 | + BuiltinExchangeType.TOPIC, |
| 152 | + durable = false, |
| 153 | + autoDelete = false, |
| 154 | + internal = false, |
| 155 | + arguments = emptyMap() |
| 156 | + ) |
| 157 | + |
| 158 | + channel.basicPublish( |
| 159 | + message.toByteArray(), |
| 160 | + exchange = "topic_logs", |
| 161 | + routingKey = routingKey, |
| 162 | + properties = Properties() |
| 163 | + ) |
| 164 | + println(" [x] Sent '$routingKey':'$message'") |
| 165 | + |
| 166 | + channel.close() |
| 167 | + connection.close() |
| 168 | +} |
| 169 | +``` |
| 170 | + |
| 171 | +The code for `receiveLogsTopic`: |
| 172 | + |
| 173 | +```kotlin |
| 174 | +import dev.kourier.amqp.BuiltinExchangeType |
| 175 | +import dev.kourier.amqp.connection.amqpConfig |
| 176 | +import dev.kourier.amqp.connection.createAMQPConnection |
| 177 | +import kotlinx.coroutines.CoroutineScope |
| 178 | + |
| 179 | +suspend fun receiveLogsTopic( |
| 180 | + coroutineScope: CoroutineScope, |
| 181 | + bindingKeys: List<String> |
| 182 | +) { |
| 183 | + val config = amqpConfig { |
| 184 | + server { |
| 185 | + host = "localhost" |
| 186 | + } |
| 187 | + } |
| 188 | + val connection = createAMQPConnection(coroutineScope, config) |
| 189 | + val channel = connection.openChannel() |
| 190 | + |
| 191 | + channel.exchangeDeclare( |
| 192 | + "topic_logs", |
| 193 | + BuiltinExchangeType.TOPIC, |
| 194 | + durable = false, |
| 195 | + autoDelete = false, |
| 196 | + internal = false, |
| 197 | + arguments = emptyMap() |
| 198 | + ) |
| 199 | + |
| 200 | + val queueDeclared = channel.queueDeclare( |
| 201 | + name = "", |
| 202 | + durable = false, |
| 203 | + exclusive = true, |
| 204 | + autoDelete = true, |
| 205 | + arguments = emptyMap() |
| 206 | + ) |
| 207 | + val queueName = queueDeclared.queueName |
| 208 | + |
| 209 | + if (bindingKeys.isEmpty()) { |
| 210 | + System.err.println("Usage: receiveLogsTopic [binding_key]...") |
| 211 | + return |
| 212 | + } |
| 213 | + |
| 214 | + for (bindingKey in bindingKeys) { |
| 215 | + channel.queueBind( |
| 216 | + queue = queueName, |
| 217 | + exchange = "topic_logs", |
| 218 | + routingKey = bindingKey |
| 219 | + ) |
| 220 | + } |
| 221 | + |
| 222 | + println(" [*] Waiting for logs. To exit press CTRL+C") |
| 223 | + |
| 224 | + val consumer = channel.basicConsume(queueName, noAck = true) |
| 225 | + |
| 226 | + for (delivery in consumer) { |
| 227 | + val message = delivery.message.body.decodeToString() |
| 228 | + val routingKey = delivery.message.routingKey |
| 229 | + println(" [x] Received '$routingKey':'$message'") |
| 230 | + } |
| 231 | + |
| 232 | + channel.close() |
| 233 | + connection.close() |
| 234 | +} |
| 235 | +``` |
| 236 | + |
| 237 | +To receive all the logs: |
| 238 | + |
| 239 | +```kotlin |
| 240 | +import kotlinx.coroutines.runBlocking |
| 241 | + |
| 242 | +fun main() = runBlocking { |
| 243 | + receiveLogsTopic(this, listOf("#")) |
| 244 | +} |
| 245 | +``` |
| 246 | + |
| 247 | +To receive all logs from the facility "`kern`": |
| 248 | + |
| 249 | +```kotlin |
| 250 | +import kotlinx.coroutines.runBlocking |
| 251 | + |
| 252 | +fun main() = runBlocking { |
| 253 | + receiveLogsTopic(this, listOf("kern.*")) |
| 254 | +} |
| 255 | +``` |
| 256 | + |
| 257 | +Or if you want to hear only about "`critical`" logs: |
| 258 | + |
| 259 | +```kotlin |
| 260 | +import kotlinx.coroutines.runBlocking |
| 261 | + |
| 262 | +fun main() = runBlocking { |
| 263 | + receiveLogsTopic(this, listOf("*.critical")) |
| 264 | +} |
| 265 | +``` |
| 266 | + |
| 267 | +You can create multiple bindings: |
| 268 | + |
| 269 | +```kotlin |
| 270 | +import kotlinx.coroutines.runBlocking |
| 271 | + |
| 272 | +fun main() = runBlocking { |
| 273 | + receiveLogsTopic(this, listOf("kern.*", "*.critical")) |
| 274 | +} |
| 275 | +``` |
| 276 | + |
| 277 | +And to emit a log with a routing key "`kern.critical`" type: |
| 278 | + |
| 279 | +```kotlin |
| 280 | +import kotlinx.coroutines.runBlocking |
| 281 | + |
| 282 | +fun main() = runBlocking { |
| 283 | + emitLogTopic(this, "kern.critical", "A critical kernel error") |
| 284 | +} |
| 285 | +``` |
| 286 | + |
| 287 | +Have fun playing with these programs. Note that the code doesn't make |
| 288 | +any assumption about the routing or binding keys, you may want to play |
| 289 | +with more than two routing key parameters. |
| 290 | + |
| 291 | +Move on to [tutorial 6](./tutorial-six-kotlin) to find out how to do a |
| 292 | +round trip message as a remote procedure call. |
0 commit comments