RabbitMQ Headers Exchange with Golang: Beginner's Guide


In RabbitMQ, the Headers Exchange is one of the exchange types used to route messages based on message headers. Unlike the other types of exchanges (Direct, Fanout, and Topic), a Headers Exchange doesnβt use a routing key for message routing. Instead, it relies on the message's header attributes to decide where to route the message. This exchange type is useful when routing decisions depend on multiple header values or specific attributes of the message.
In this article, weβll cover:
What is a Headers Exchange?
How it works with a flowchart.
Implementing Headers Exchange with Golang.
Detailed line-by-line explanation of both producer and consumer code.
π§βπ» What is a Headers Exchange?
A Headers Exchange allows messages to be routed based on the headers of the message, which are key-value pairs set by the producer. This type of exchange does not use routing keys to direct messages but instead uses the message's headers to determine which queues should receive the message.
Producer sends messages with specific header attributes.
Exchange matches the message headers with the bindings defined for each queue.
Consumer reads the messages from the queue.
π Flowchart: How Headers Exchange Works
The headers exchange uses message headers to route messages to the correct queue. Here's how it works:
+-------------------+
| Producer |
+-------------------+
|
|
+--------------------+
| Headers Exchange |
+--------------------+
/ | \
______/ | \______
/ | \
+---------+ +---------+ +---------+
| Queue 1 | | Queue 2 | | Queue 3 |
+---------+ +---------+ +---------+
| | |
+---------+ +---------+ +---------+
| Consumer| | Consumer| | Consumer|
+---------+ +---------+ +---------+
Queue1 is bound with header
{"category": "error"}
and will only receive messages with the headercategory=error
.Queue2 is bound with header
{"category": "info"}
and will only receive messages with the headercategory=info
.Queue3 is bound with header
{"category": "warning"}
and will only receive messages with the headercategory=warning
.
π Prerequisites
Before implementing the code, ensure you have the following setup:
Install RabbitMQ.
Install Go.
Install the RabbitMQ client library for Go:
go get github.com/streadway/amqp
π Producer Code
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// Step 1: Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Step 2: Create a channel
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Step 3: Declare a Headers Exchange
err = ch.ExchangeDeclare(
"headers_logs", // Exchange name
"headers", // Exchange type
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Additional arguments
)
if err != nil {
log.Fatal(err)
}
// Step 4: Publish messages with headers
headers := map[string]map[string]string{
"error": {"category": "error", "message": "An error occurred"},
"info": {"category": "info", "message": "Informational message"},
"warning": {"category": "warning", "message": "Warning message"},
}
for key, header := range headers {
err = ch.Publish(
"headers_logs", // Exchange name
"", // Routing key is empty in headers exchange
false, // Mandatory flag
false, // Immediate flag
amqp.Publishing{
ContentType: "text/plain",
Headers: header, // Add headers
Body: []byte(header["message"]),
},
)
if err != nil {
log.Fatal(err)
}
log.Printf(" [x] Sent: %s", key)
}
}
Producer Code Explanation
Connecting to RabbitMQ:
amqp.Dial("amqp://guest:
guest@localhost:5672
/")
: Establishes a connection to the RabbitMQ server using the default guest credentials.If the connection fails, the program will exit with an error message.
Creating a Channel:
conn.Channel
()
: Creates a channel through which messages will be sent and received. The channel is the main communication medium with RabbitMQ.
Declaring the Headers Exchange:
ch.ExchangeDeclare
: Declares a Headers Exchange calledheaders_logs
that will route messages based on their headers. The typeheaders
is specified for this exchange.
Publishing Messages:
ch.Publish
: Sends messages to theheaders_logs
exchange. The routing key is empty, as headers exchange does not use routing keys. Instead, it matches the headers of the message with the header values bound to queues.
π Consumer Code
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// Step 1: Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Step 2: Create a channel
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Step 3: Declare a temporary queue
q, err := ch.QueueDeclare(
"", // Empty name for a temporary queue
false, // Durable
true, // Auto-deleted
false, // Exclusive
false, // No-wait
nil, // Additional arguments
)
if err != nil {
log.Fatal(err)
}
// Step 4: Bind the queue to the Headers Exchange with specific headers
header := amqp.Table{
"category": "error", // Binding header (category=error)
}
err = ch.QueueBind(
q.Name, // Queue name
"", // Routing key is not used in headers exchange
"headers_logs", // Exchange name
false, // No-wait
header, // Bind with headers
)
if err != nil {
log.Fatal(err)
}
// Step 5: Start consuming messages from the queue
msgs, err := ch.Consume(
q.Name, // Queue name
"", // Consumer tag (empty string means automatic generation)
true, // Auto-acknowledge
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Additional arguments
)
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received message: %s", d.Body) // Log the received message
}
}()
log.Printf("Waiting for messages with headers: %v", header)
<-forever
}
Consumer Code Explanation
Connecting to RabbitMQ:
amqp.Dial("amqp://guest:
guest@localhost:5672
/")
: Connects to RabbitMQ with default credentials (guest:guest
).
Creating a Channel:
conn.Channel
()
: Creates a channel to communicate with RabbitMQ.
Declaring a Temporary Queue:
ch.QueueDeclare
: Declares a temporary queue that will be auto-deleted when the consumer disconnects.
Binding the Queue:
ch.QueueBind
: Binds the queue to theheaders_logs
exchange using the specified header (category=error
). Only messages with the matching header will be routed to this queue.
Consuming Messages:
ch.Consume
: Starts consuming messages from the queue. It will automatically acknowledge messages as they are received.
Message Handling:
- For each received message, the consumer prints the message body to the console.
β Key Takeaways
Headers Exchange routes messages based on header values, making it flexible for scenarios where multiple attributes need to be checked to decide message routing.
Unlike other exchanges, headers exchanges do not use routing keys.
A producer can send messages with specific headers, and consumers can filter messages based on header values.
This approach is useful in situations where you want to route messages based on complex conditions such as multiple attributes.
Subscribe to my newsletter
Read articles from Shivam Dubey directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
