RabbitMQ Topic Exchange with Golang: Beginner's Guide


A Topic Exchange is another important exchange type in RabbitMQ. Unlike Direct and Fanout Exchanges, a Topic Exchange allows routing of messages to queues based on wildcard matching between the routing key and the binding key. This type of exchange is particularly useful for pattern-based routing.
In this article, weโll cover:
What is a Topic Exchange?
How it works with a flowchart.
Implementing Topic Exchange with Golang.
Detailed line-by-line explanation of the producer and consumer code.
๐งโ๐ป What is a Topic Exchange?
A Topic Exchange routes messages to queues based on a routing key pattern. The routing key is a string, and queues can be bound to the exchange using wildcard patterns. Wildcards allow for flexibility in routing messages to multiple queues.
Producer sends messages to the exchange with a routing key.
Exchange matches the routing key to the binding key of the queues using a wildcard matching pattern.
Consumer reads the messages from the queue.
๐ Flowchart: How Topic Exchange Works
The Topic Exchange uses routing keys like animal.dog
.barking
and binding keys like animal.*.*
. In this example, the wildcard *
can match exactly one word, and #
can match multiple words.
+-------------------+
| Producer |
+-------------------+
|
|
+--------------------+
| Topic Exchange |
+--------------------+
/ | \
______/ | \______
/ | \
+---------+ +---------+ +---------+
| Queue 1 | | Queue 2 | | Queue 3 |
+---------+ +---------+ +---------+
| | | |
+------------+ +------------+ +------------+
| Consumer 1 | | Consumer 2 | | Consumer 3 |
+------------+ +------------+ +------------+
Queue1 is bound with routing key
animal.dog
.*
and will receive messages likeanimal.dog
.barking
.Queue2 is bound with routing key
animal.*.barking
and will receive messages likeanimal.dog
.barking
oranimal.cat
.barking
.Queue3 is bound with routing key
animal.#
and will receive any message that starts withanimal
.
๐ Prerequisites
Install RabbitMQ.
Install Go.
Install the RabbitMQ client library for Go:
go get github.com/streadway/amqp
๐ Producer Code
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// Step 1: Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Step 2: Create a channel
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Step 3: Declare a Topic Exchange
err = ch.ExchangeDeclare(
"topic_logs", // Exchange name
"topic", // Exchange type (topic)
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Additional arguments
)
// Step 4: Publish messages with different routing keys
messages := map[string]string{
"animal.dog.barking": "Dog is barking",
"animal.cat.meowing": "Cat is meowing",
"animal.dog.sleeping": "Dog is sleeping",
}
for key, body := range messages {
err = ch.Publish(
"topic_logs", // Exchange name
key, // Routing key
false, // Mandatory flag
false, // Immediate flag
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
log.Printf(" [x] Sent %s: %s", key, body)
}
}
Producer Code Explanation
Connecting to RabbitMQ:
amqp.Dial("amqp://guest:
guest@localhost:5672
/")
: Connects to RabbitMQ with defaultguest
credentials. If connection fails, the program exits with an error message.
Creating a Channel:
conn.Channel
()
: Creates a communication channel for sending and receiving messages.
Declaring the Topic Exchange:
ch.ExchangeDeclare
: Declares a Topic Exchange namedtopic_logs
. The typetopic
is specified to ensure the exchange routes messages based on the routing key.
Publishing Messages:
ch.Publish
: Sends messages to thetopic_logs
exchange. Each message is assigned a routing key (likeanimal.dog
.barking
). The message is then delivered to all queues bound with matching binding keys.
๐ Consumer Code
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// Step 1: Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Step 2: Create a channel
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Step 3: Declare a temporary queue
q, err := ch.QueueDeclare(
"", // Empty name for a temporary queue
false, // Durable
true, // Auto-deleted
false, // Exclusive
false, // No-wait
nil, // Additional arguments
)
if err != nil {
log.Fatal(err)
}
// Step 4: Bind the queue to the Topic Exchange with a routing key pattern
routingKey := os.Args[1] // e.g., "animal.dog.#"
err = ch.QueueBind(
q.Name, // Queue name
routingKey, // Routing key
"topic_logs", // Exchange name
false, // No-wait
nil, // Additional arguments
)
if err != nil {
log.Fatal(err)
}
// Step 5: Start consuming messages from the queue
msgs, err := ch.Consume(
q.Name, // Queue name
"", // Consumer tag (empty string means automatic generation)
true, // Auto-acknowledge
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Additional arguments
)
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received message: %s", d.Body) // Log the received message
}
}()
log.Printf("Waiting for messages with routing key: %s", routingKey)
<-forever
}
Consumer Code Explanation
Connecting to RabbitMQ:
amqp.Dial("amqp://guest:
guest@localhost:5672
/")
: Connects to RabbitMQ with the defaultguest
credentials.
Creating a Channel:
conn.Channel
()
: Creates a communication channel for consuming messages.
Declaring a Temporary Queue:
ch.QueueDeclare
: Declares a temporary queue with no name (""
). The queue will be auto-deleted when the consumer disconnects.
Binding the Queue:
ch.QueueBind
: Binds the queue to thetopic_logs
exchange with the specified routing key pattern (likeanimal.dog
.#
). The consumer listens for messages matching the pattern.
Consuming Messages:
ch.Consume
: Starts consuming messages from the queue. The consumer acknowledges messages automatically (autoAck=true
).
Message Handling:
- For each message received, the consumer logs the message content.
โ Key Takeaways
Topic Exchange is ideal for routing messages based on patterns using wildcards (
*
for exactly one word, and#
for zero or more words).It provides flexibility in routing messages to multiple consumers based on their interests, making it suitable for applications that need to handle specific categories of messages (like
animal.dog
.barking
vsanimal.cat
.meowing
).The Producer sends messages to the exchange, and the Consumer can filter the messages it receives using routing key patterns.
Subscribe to my newsletter
Read articles from Shivam Dubey directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
