RabbitMQ Direct Exchange with Golang: Beginner's Guide

Shivam DubeyShivam Dubey
5 min read

A Direct Exchange is one of the four main exchange types in RabbitMQ. It is the simplest and most commonly used exchange type for point-to-point communication.

In this article, we’ll cover:

  • What is a Direct Exchange?

  • How it works using a clear flowchart.

  • Implementing Direct Exchange with Golang.

  • Detailed line-by-line code explanation for both the producer and consumer.

  • Explanation of functions, arguments, and their purpose.


πŸ§‘β€πŸ’» What is a Direct Exchange?

A Direct Exchange routes messages to queues based on a specific routing key. Each queue is bound to the exchange with a particular key, and only messages with a matching routing key will be delivered to that queue.

  • Producer sends messages to the exchange with a routing key.

  • Exchange matches the routing key to the binding key of the queues.

  • Consumer reads the messages from the queue.

πŸ“Š Flowchart: How Direct Exchange Works

                +-------------------+            
                |      Producer     |            
                +-------------------+            
                          |                         
                          |                         
                 +--------------------+            
                 |    Direct Exchange   |           
                 +--------------------+            
                       /     |     \               
                ______/      |      \______        
               /             |             \       
       +---------+     +---------+     +---------+ 
       | Queue 1 |     | Queue 2 |     | Queue 3 | 
       +---------+     +---------+     +---------+ 
            |               |               |      
     +---------+      +---------+      +---------+ 
     | Consumer|      | Consumer|      | Consumer| 
     +---------+      +---------+      +---------+
  • Queue1 is bound with routing key error

  • Queue2 is bound with routing key info

  • Queue3 is bound with routing key warning

  • Message with error routing key goes to Queue1.

  • Message with info routing key goes to Queue2.

  • Message with warning routing key goes to Queue3.

βœ… Key Points to Remember

  • Direct Exchange is ideal for routing messages to specific consumers.

  • Each queue is bound with a specific routing key.

  • Producers can send messages with a routing key that matches the queue binding.


πŸ›  Prerequisites

  • Install RabbitMQ.

  • Install Go.

  • Install the RabbitMQ client library for Go:

go get github.com/streadway/amqp

πŸš€ Producer Code

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a Direct Exchange
    err = ch.ExchangeDeclare(
        "direct_logs", // Exchange name
        "direct",      // Exchange type
        true,           // Durable
        false,          // Auto-deleted
        false,          // Internal
        false,          // No-wait
        nil,            // Additional arguments
    )

    // Step 4: Publish messages with different routing keys
    messages := map[string]string{
        "error": "Error message",
        "info":  "Info message",
        "warning": "Warning message",
    }

    for key, body := range messages {
        err = ch.Publish(
            "direct_logs", // Exchange name
            key,           // Routing key
            false,          // Mandatory flag
            false,          // Immediate flag
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body), // Message body
            },
        )
        log.Printf(" [x] Sent %s: %s", key, body) // Log the sent message
    }
}

Producer Code Explanation

  1. Connection Establishment:

    • amqp.Dial("amqp://guest:guest@localhost:5672/"): This connects to RabbitMQ using the default guest user and password.
  2. Creating a Channel:

    • conn.Channel(): Creates a channel through which communication with RabbitMQ happens.
  3. Declaring the Exchange:

    • ch.ExchangeDeclare: This function declares the Direct Exchange named direct_logs with type direct.
  4. Publishing Messages:

    • ch.Publish: Sends messages to the exchange. The key acts as the routing key, ensuring messages are delivered to the right queue.

πŸš€ Consumer Code

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a queue
    q, err := ch.QueueDeclare(
        "",      // Empty name for a temporary queue
        false,   // Durable
        true,    // Auto-deleted
        false,   // Exclusive
        false,   // No-wait
        nil,     // Additional arguments
    )

    // Step 4: Bind queue to exchange
    severity := os.Args[1] // Command-line argument specifying the routing key
    err = ch.QueueBind(
        q.Name,    // Queue name
        severity,  // Routing key (message severity)
        "direct_logs", // Exchange name
        false,
        nil,
    )

    msgs, err := ch.Consume(
        q.Name, // Queue name
        "",     // Consumer tag (empty string means automatic generation)
        true,   // Auto-acknowledge
        false,  // Exclusive
        false,  // No-local
        false,  // No-wait
        nil,    // Additional arguments
    )

    forever := make(chan bool)
    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body) // Log the message
        }
    }()
    log.Printf("Waiting for messages with severity: %s", severity)
    <-forever
}

Consumer Code Explanation

  1. Connection Establishment:

    • amqp.Dial("amqp://guest:guest@localhost:5672/"): This connects to RabbitMQ using the default guest user and password.
  2. Creating a Channel:

  3. Declaring a Queue:

    • ch.QueueDeclare: This declares a temporary queue (no name specified) for receiving messages. This queue will be automatically deleted when the consumer disconnects.
  4. Binding the Queue:

    • ch.QueueBind: Binds the queue to the direct_logs exchange with the specified routing key (severity).
  5. Consuming Messages:

    • ch.Consume: Starts listening for messages in the queue. The consumer will print any message it receives to the console.
  6. Message Handling:

    • forever := make(chan bool): Creates a channel to keep the program running and waiting for messages.

βœ… Key Takeaways

  • Direct Exchange is a simple and effective way to route messages to specific queues based on routing keys.

  • The Producer sends messages with specific routing keys, while the Consumer listens to queues bound to those routing keys.

  • This example demonstrates how to set up both a producer and a consumer in Go using RabbitMQ’s Direct Exchange.


0
Subscribe to my newsletter

Read articles from Shivam Dubey directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shivam Dubey
Shivam Dubey