RabbitMQ Headers Exchange with Golang: Beginner's Guide

Shivam DubeyShivam Dubey
5 min read

In RabbitMQ, the Headers Exchange is one of the exchange types used to route messages based on message headers. Unlike the other types of exchanges (Direct, Fanout, and Topic), a Headers Exchange doesn’t use a routing key for message routing. Instead, it relies on the message's header attributes to decide where to route the message. This exchange type is useful when routing decisions depend on multiple header values or specific attributes of the message.

In this article, we’ll cover:

  • What is a Headers Exchange?

  • How it works with a flowchart.

  • Implementing Headers Exchange with Golang.

  • Detailed line-by-line explanation of both producer and consumer code.


πŸ§‘β€πŸ’» What is a Headers Exchange?

A Headers Exchange allows messages to be routed based on the headers of the message, which are key-value pairs set by the producer. This type of exchange does not use routing keys to direct messages but instead uses the message's headers to determine which queues should receive the message.

  • Producer sends messages with specific header attributes.

  • Exchange matches the message headers with the bindings defined for each queue.

  • Consumer reads the messages from the queue.

πŸ“Š Flowchart: How Headers Exchange Works

The headers exchange uses message headers to route messages to the correct queue. Here's how it works:

                +-------------------+            
                |      Producer     |            
                +-------------------+            
                          |                         
                          |                         
                 +--------------------+            
                 |  Headers Exchange  |           
                 +--------------------+            
                       /     |     \               
                ______/      |      \______        
               /             |             \       
       +---------+     +---------+     +---------+ 
       | Queue 1 |     | Queue 2 |     | Queue 3 | 
       +---------+     +---------+     +---------+ 
            |               |               |      
     +---------+      +---------+      +---------+ 
     | Consumer|      | Consumer|      | Consumer| 
     +---------+      +---------+      +---------+
  • Queue1 is bound with header {"category": "error"} and will only receive messages with the header category=error.

  • Queue2 is bound with header {"category": "info"} and will only receive messages with the header category=info.

  • Queue3 is bound with header {"category": "warning"} and will only receive messages with the header category=warning.


πŸ›  Prerequisites

Before implementing the code, ensure you have the following setup:

  • Install RabbitMQ.

  • Install Go.

  • Install the RabbitMQ client library for Go:

go get github.com/streadway/amqp

πŸš€ Producer Code

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a Headers Exchange
    err = ch.ExchangeDeclare(
        "headers_logs", // Exchange name
        "headers",      // Exchange type
        true,           // Durable
        false,          // Auto-deleted
        false,          // Internal
        false,          // No-wait
        nil,            // Additional arguments
    )
    if err != nil {
        log.Fatal(err)
    }

    // Step 4: Publish messages with headers
    headers := map[string]map[string]string{
        "error":   {"category": "error", "message": "An error occurred"},
        "info":    {"category": "info", "message": "Informational message"},
        "warning": {"category": "warning", "message": "Warning message"},
    }

    for key, header := range headers {
        err = ch.Publish(
            "headers_logs",  // Exchange name
            "",              // Routing key is empty in headers exchange
            false,           // Mandatory flag
            false,           // Immediate flag
            amqp.Publishing{
                ContentType: "text/plain",
                Headers:     header,  // Add headers
                Body:        []byte(header["message"]),
            },
        )
        if err != nil {
            log.Fatal(err)
        }
        log.Printf(" [x] Sent: %s", key)
    }
}

Producer Code Explanation

  1. Connecting to RabbitMQ:

    • amqp.Dial("amqp://guest:guest@localhost:5672/"): Establishes a connection to the RabbitMQ server using the default guest credentials.

    • If the connection fails, the program will exit with an error message.

  2. Creating a Channel:

    • conn.Channel(): Creates a channel through which messages will be sent and received. The channel is the main communication medium with RabbitMQ.
  3. Declaring the Headers Exchange:

    • ch.ExchangeDeclare: Declares a Headers Exchange called headers_logs that will route messages based on their headers. The type headers is specified for this exchange.
  4. Publishing Messages:

    • ch.Publish: Sends messages to the headers_logs exchange. The routing key is empty, as headers exchange does not use routing keys. Instead, it matches the headers of the message with the header values bound to queues.

πŸš€ Consumer Code

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a temporary queue
    q, err := ch.QueueDeclare(
        "",      // Empty name for a temporary queue
        false,   // Durable
        true,    // Auto-deleted
        false,   // Exclusive
        false,   // No-wait
        nil,     // Additional arguments
    )
    if err != nil {
        log.Fatal(err)
    }

    // Step 4: Bind the queue to the Headers Exchange with specific headers
    header := amqp.Table{
        "category": "error", // Binding header (category=error)
    }

    err = ch.QueueBind(
        q.Name,        // Queue name
        "",            // Routing key is not used in headers exchange
        "headers_logs", // Exchange name
        false,          // No-wait
        header,         // Bind with headers
    )
    if err != nil {
        log.Fatal(err)
    }

    // Step 5: Start consuming messages from the queue
    msgs, err := ch.Consume(
        q.Name, // Queue name
        "",     // Consumer tag (empty string means automatic generation)
        true,   // Auto-acknowledge
        false,  // Exclusive
        false,  // No-local
        false,  // No-wait
        nil,    // Additional arguments
    )

    forever := make(chan bool)
    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body) // Log the received message
        }
    }()
    log.Printf("Waiting for messages with headers: %v", header)
    <-forever
}

Consumer Code Explanation

  1. Connecting to RabbitMQ:

    • amqp.Dial("amqp://guest:guest@localhost:5672/"): Connects to RabbitMQ with default credentials (guest:guest).
  2. Creating a Channel:

    • conn.Channel(): Creates a channel to communicate with RabbitMQ.
  3. Declaring a Temporary Queue:

    • ch.QueueDeclare: Declares a temporary queue that will be auto-deleted when the consumer disconnects.
  4. Binding the Queue:

    • ch.QueueBind: Binds the queue to the headers_logs exchange using the specified header (category=error). Only messages with the matching header will be routed to this queue.
  5. Consuming Messages:

    • ch.Consume: Starts consuming messages from the queue. It will automatically acknowledge messages as they are received.
  6. Message Handling:

    • For each received message, the consumer prints the message body to the console.

βœ… Key Takeaways

  • Headers Exchange routes messages based on header values, making it flexible for scenarios where multiple attributes need to be checked to decide message routing.

  • Unlike other exchanges, headers exchanges do not use routing keys.

  • A producer can send messages with specific headers, and consumers can filter messages based on header values.

  • This approach is useful in situations where you want to route messages based on complex conditions such as multiple attributes.


0
Subscribe to my newsletter

Read articles from Shivam Dubey directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shivam Dubey
Shivam Dubey