RabbitMQ Direct Exchange with Golang: Beginner's Guide


A Direct Exchange is one of the four main exchange types in RabbitMQ. It is the simplest and most commonly used exchange type for point-to-point communication.
In this article, weβll cover:
What is a Direct Exchange?
How it works using a clear flowchart.
Implementing Direct Exchange with Golang.
Detailed line-by-line code explanation for both the producer and consumer.
Explanation of functions, arguments, and their purpose.
π§βπ» What is a Direct Exchange?
A Direct Exchange routes messages to queues based on a specific routing key. Each queue is bound to the exchange with a particular key, and only messages with a matching routing key will be delivered to that queue.
Producer sends messages to the exchange with a routing key.
Exchange matches the routing key to the binding key of the queues.
Consumer reads the messages from the queue.
π Flowchart: How Direct Exchange Works
+-------------------+
| Producer |
+-------------------+
|
|
+--------------------+
| Direct Exchange |
+--------------------+
/ | \
______/ | \______
/ | \
+---------+ +---------+ +---------+
| Queue 1 | | Queue 2 | | Queue 3 |
+---------+ +---------+ +---------+
| | |
+---------+ +---------+ +---------+
| Consumer| | Consumer| | Consumer|
+---------+ +---------+ +---------+
Queue1 is bound with routing key
error
Queue2 is bound with routing key
info
Queue3 is bound with routing key
warning
Message with
error
routing key goes to Queue1.Message with
info
routing key goes to Queue2.Message with
warning
routing key goes to Queue3.
β Key Points to Remember
Direct Exchange is ideal for routing messages to specific consumers.
Each queue is bound with a specific routing key.
Producers can send messages with a routing key that matches the queue binding.
π Prerequisites
Install RabbitMQ.
Install Go.
Install the RabbitMQ client library for Go:
go get github.com/streadway/amqp
π Producer Code
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// Step 1: Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Step 2: Create a channel
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Step 3: Declare a Direct Exchange
err = ch.ExchangeDeclare(
"direct_logs", // Exchange name
"direct", // Exchange type
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Additional arguments
)
// Step 4: Publish messages with different routing keys
messages := map[string]string{
"error": "Error message",
"info": "Info message",
"warning": "Warning message",
}
for key, body := range messages {
err = ch.Publish(
"direct_logs", // Exchange name
key, // Routing key
false, // Mandatory flag
false, // Immediate flag
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body), // Message body
},
)
log.Printf(" [x] Sent %s: %s", key, body) // Log the sent message
}
}
Producer Code Explanation
Connection Establishment:
amqp.Dial("amqp://guest:
guest@localhost:5672
/")
: This connects to RabbitMQ using the defaultguest
user and password.
Creating a Channel:
conn.Channel
()
: Creates a channel through which communication with RabbitMQ happens.
Declaring the Exchange:
ch.ExchangeDeclare
: This function declares the Direct Exchange nameddirect_logs
with typedirect
.
Publishing Messages:
ch.Publish
: Sends messages to the exchange. Thekey
acts as the routing key, ensuring messages are delivered to the right queue.
π Consumer Code
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// Step 1: Connect to RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Step 2: Create a channel
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Step 3: Declare a queue
q, err := ch.QueueDeclare(
"", // Empty name for a temporary queue
false, // Durable
true, // Auto-deleted
false, // Exclusive
false, // No-wait
nil, // Additional arguments
)
// Step 4: Bind queue to exchange
severity := os.Args[1] // Command-line argument specifying the routing key
err = ch.QueueBind(
q.Name, // Queue name
severity, // Routing key (message severity)
"direct_logs", // Exchange name
false,
nil,
)
msgs, err := ch.Consume(
q.Name, // Queue name
"", // Consumer tag (empty string means automatic generation)
true, // Auto-acknowledge
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Additional arguments
)
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received message: %s", d.Body) // Log the message
}
}()
log.Printf("Waiting for messages with severity: %s", severity)
<-forever
}
Consumer Code Explanation
Connection Establishment:
amqp.Dial("amqp://guest:
guest@localhost:5672
/")
: This connects to RabbitMQ using the defaultguest
user and password.
Creating a Channel:
conn.Channel
()
: Creates a channel for message reception.
Declaring a Queue:
ch.QueueDeclare
: This declares a temporary queue (no name specified) for receiving messages. This queue will be automatically deleted when the consumer disconnects.
Binding the Queue:
ch.QueueBind
: Binds the queue to thedirect_logs
exchange with the specified routing key (severity).
Consuming Messages:
ch.Consume
: Starts listening for messages in the queue. The consumer will print any message it receives to the console.
Message Handling:
forever := make(chan bool)
: Creates a channel to keep the program running and waiting for messages.
β Key Takeaways
Direct Exchange is a simple and effective way to route messages to specific queues based on routing keys.
The Producer sends messages with specific routing keys, while the Consumer listens to queues bound to those routing keys.
This example demonstrates how to set up both a producer and a consumer in Go using RabbitMQβs Direct Exchange.
Subscribe to my newsletter
Read articles from Shivam Dubey directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
