RabbitMQ Fanout Exchange with Golang: Beginner's Guide

Shivam DubeyShivam Dubey
5 min read

A Fanout Exchange is another important type of exchange in RabbitMQ. Unlike the Direct Exchange, which routes messages based on routing keys, a Fanout Exchange delivers messages to all queues bound to it, regardless of any routing key.

In this article, weโ€™ll cover:

  • What is a Fanout Exchange?

  • How it works with a clear flowchart.

  • Implementing Fanout Exchange with Golang.

  • Detailed line-by-line explanation of the producer and consumer code.


๐Ÿง‘โ€๐Ÿ’ป What is a Fanout Exchange?

A Fanout Exchange routes messages to all queues bound to it without considering a routing key. This is ideal for scenarios where a message should be delivered to multiple consumers simultaneously.

  • Producer sends messages to the exchange.

  • Exchange routes the message to all queues bound to it, without checking any routing key.

  • Consumers receive messages from the queues.

๐Ÿ“Š Flowchart: How Fanout Exchange Works with Routing Key

In the Fanout Exchange, even though the routing key is provided by the producer, the exchange ignores the routing key and sends the message to all queues bound to it.

                +-------------------+            
                |      Producer     |            
                +-------------------+            
                          |                         
                          |                         
                 +--------------------+            
                 |    Fanout Exchange |           
                 +--------------------+            
                       /     |     \               
                ______/      |      \______        
               /             |             \       
       +---------+     +---------+     +---------+ 
       | Queue 1 |     | Queue 2 |     | Queue 3 | 
       +---------+     +---------+     +---------+ 
         |                |               |       
   +------------+   +------------+   +------------+  
   | Consumer 1 |   | Consumer 2 |   | Consumer 3 |  
   +------------+   +------------+   +------------+
  • Queue1, Queue2, and Queue3 are all bound to the Fanout Exchange.

  • The Producer sends a message, for example, with the routing key info.

  • Fanout Exchange ignores the routing key and sends the message to Queue1, Queue2, and Queue3.

  • All consumers connected to these queues will receive the message.


๐Ÿ›  Prerequisites

  • Install RabbitMQ.

  • Install Go.

  • Install the RabbitMQ client library for Go:

go get github.com/streadway/amqp

๐Ÿš€ Producer Code

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a Fanout Exchange
    err = ch.ExchangeDeclare(
        "logs",   // Exchange name
        "fanout", // Exchange type (fanout)
        true,     // Durable
        false,    // Auto-deleted
        false,    // Internal
        false,    // No-wait
        nil,      // Additional arguments
    )

    // Step 4: Publish a message to the exchange
    err = ch.Publish(
        "logs",        // Exchange name
        "",            // Routing key is ignored
        false,         // Mandatory flag
        false,         // Immediate flag
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte("Hello, World!"), // Message body
        },
    )
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [x] Sent 'Hello, World!'")
}

Producer Code Explanation

  1. Connecting to RabbitMQ:

    • amqp.Dial("amqp://guest:guest@localhost:5672/"): Connects to RabbitMQ using the default guest credentials. If the connection fails, the program exits with an error message.
  2. Creating a Channel:

    • conn.Channel(): Creates a communication channel. All operations in RabbitMQ (publishing, consuming) happen through a channel.
  3. Declaring the Exchange:

    • ch.ExchangeDeclare: This declares a Fanout Exchange named logs. The type fanout is specified, which means messages will be sent to all queues bound to this exchange.
  4. Publishing the Message:

    • ch.Publish: This sends a message to the exchange logs. The routing key is left empty because it is not used by a Fanout Exchange. The message body is "Hello, World!".

๐Ÿš€ Consumer Code

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a temporary queue
    q, err := ch.QueueDeclare(
        "",      // Empty name for a temporary queue
        false,   // Durable
        true,    // Auto-deleted
        false,   // Exclusive
        false,   // No-wait
        nil,     // Additional arguments
    )
    if err != nil {
        log.Fatal(err)
    }

    // Step 4: Bind the queue to the Fanout Exchange
    err = ch.QueueBind(
        q.Name,   // Queue name
        "",       // No routing key (ignored by Fanout Exchange)
        "logs",   // Exchange name
        false,    // No-wait
        nil,      // Additional arguments
    )
    if err != nil {
        log.Fatal(err)
    }

    // Step 5: Start consuming messages from the queue
    msgs, err := ch.Consume(
        q.Name, // Queue name
        "",     // Consumer tag (empty string means automatic generation)
        true,   // Auto-acknowledge
        false,  // Exclusive
        false,  // No-local
        false,  // No-wait
        nil,    // Additional arguments
    )

    forever := make(chan bool)
    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body) // Log the received message
        }
    }()
    log.Printf("Waiting for messages...")
    <-forever
}

Consumer Code Explanation

  1. Connecting to RabbitMQ:

  2. Creating a Channel:

  3. Declaring a Temporary Queue:

    • ch.QueueDeclare: Declares a temporary queue with no name (""). This queue is auto-deleted when the consumer disconnects. It is exclusive and non-durable.
  4. Binding the Queue:

    • ch.QueueBind: Binds the temporary queue to the logs exchange. The routing key is empty since Fanout Exchange ignores routing keys.
  5. Consuming Messages:

    • ch.Consume: Starts consuming messages from the queue. The consumer automatically acknowledges messages (since autoAck is true).
  6. Message Handling:

    • The consumer logs each message it receives from the queue.

โœ… Key Takeaways

  • Fanout Exchange sends messages to all bound queues regardless of any routing key.

  • It is useful in scenarios where you need to broadcast messages to multiple consumers.

  • The producer sends a message to the Fanout Exchange, and all queues bound to this exchange will receive that message.


0
Subscribe to my newsletter

Read articles from Shivam Dubey directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shivam Dubey
Shivam Dubey