RabbitMQ Topic Exchange with Golang: Beginner's Guide

Shivam DubeyShivam Dubey
5 min read

A Topic Exchange is another important exchange type in RabbitMQ. Unlike Direct and Fanout Exchanges, a Topic Exchange allows routing of messages to queues based on wildcard matching between the routing key and the binding key. This type of exchange is particularly useful for pattern-based routing.

In this article, weโ€™ll cover:

  • What is a Topic Exchange?

  • How it works with a flowchart.

  • Implementing Topic Exchange with Golang.

  • Detailed line-by-line explanation of the producer and consumer code.


๐Ÿง‘โ€๐Ÿ’ป What is a Topic Exchange?

A Topic Exchange routes messages to queues based on a routing key pattern. The routing key is a string, and queues can be bound to the exchange using wildcard patterns. Wildcards allow for flexibility in routing messages to multiple queues.

  • Producer sends messages to the exchange with a routing key.

  • Exchange matches the routing key to the binding key of the queues using a wildcard matching pattern.

  • Consumer reads the messages from the queue.

๐Ÿ“Š Flowchart: How Topic Exchange Works

The Topic Exchange uses routing keys like animal.dog.barking and binding keys like animal.*.*. In this example, the wildcard * can match exactly one word, and # can match multiple words.

                +-------------------+            
                |      Producer     |            
                +-------------------+            
                          |                         
                          |                         
                 +--------------------+            
                 |    Topic Exchange  |           
                 +--------------------+            
                       /     |     \               
                ______/      |      \______        
               /             |             \       
       +---------+     +---------+     +---------+ 
       | Queue 1 |     | Queue 2 |     | Queue 3 | 
       +---------+     +---------+     +---------+ 
        |    |            |             |       
   +------------+   +------------+   +------------+  
   | Consumer 1 |   | Consumer 2 |   | Consumer 3 |  
   +------------+   +------------+   +------------+
  • Queue1 is bound with routing key animal.dog.* and will receive messages like animal.dog.barking.

  • Queue2 is bound with routing key animal.*.barking and will receive messages like animal.dog.barking or animal.cat.barking.

  • Queue3 is bound with routing key animal.# and will receive any message that starts with animal.


๐Ÿ›  Prerequisites

  • Install RabbitMQ.

  • Install Go.

  • Install the RabbitMQ client library for Go:

go get github.com/streadway/amqp

๐Ÿš€ Producer Code

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a Topic Exchange
    err = ch.ExchangeDeclare(
        "topic_logs",  // Exchange name
        "topic",       // Exchange type (topic)
        true,          // Durable
        false,         // Auto-deleted
        false,         // Internal
        false,         // No-wait
        nil,           // Additional arguments
    )

    // Step 4: Publish messages with different routing keys
    messages := map[string]string{
        "animal.dog.barking":    "Dog is barking",
        "animal.cat.meowing":    "Cat is meowing",
        "animal.dog.sleeping":   "Dog is sleeping",
    }

    for key, body := range messages {
        err = ch.Publish(
            "topic_logs",  // Exchange name
            key,           // Routing key
            false,         // Mandatory flag
            false,         // Immediate flag
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(body),
            },
        )
        log.Printf(" [x] Sent %s: %s", key, body)
    }
}

Producer Code Explanation

  1. Connecting to RabbitMQ:

    • amqp.Dial("amqp://guest:guest@localhost:5672/"): Connects to RabbitMQ with default guest credentials. If connection fails, the program exits with an error message.
  2. Creating a Channel:

    • conn.Channel(): Creates a communication channel for sending and receiving messages.
  3. Declaring the Topic Exchange:

    • ch.ExchangeDeclare: Declares a Topic Exchange named topic_logs. The type topic is specified to ensure the exchange routes messages based on the routing key.
  4. Publishing Messages:

    • ch.Publish: Sends messages to the topic_logs exchange. Each message is assigned a routing key (like animal.dog.barking). The message is then delivered to all queues bound with matching binding keys.

๐Ÿš€ Consumer Code

package main

import (
    "log"
    "os"
    "github.com/streadway/amqp"
)

func main() {
    // Step 1: Connect to RabbitMQ Server
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // Step 2: Create a channel
    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Step 3: Declare a temporary queue
    q, err := ch.QueueDeclare(
        "",      // Empty name for a temporary queue
        false,   // Durable
        true,    // Auto-deleted
        false,   // Exclusive
        false,   // No-wait
        nil,     // Additional arguments
    )
    if err != nil {
        log.Fatal(err)
    }

    // Step 4: Bind the queue to the Topic Exchange with a routing key pattern
    routingKey := os.Args[1] // e.g., "animal.dog.#"
    err = ch.QueueBind(
        q.Name,   // Queue name
        routingKey, // Routing key
        "topic_logs", // Exchange name
        false,    // No-wait
        nil,      // Additional arguments
    )
    if err != nil {
        log.Fatal(err)
    }

    // Step 5: Start consuming messages from the queue
    msgs, err := ch.Consume(
        q.Name, // Queue name
        "",     // Consumer tag (empty string means automatic generation)
        true,   // Auto-acknowledge
        false,  // Exclusive
        false,  // No-local
        false,  // No-wait
        nil,    // Additional arguments
    )

    forever := make(chan bool)
    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body) // Log the received message
        }
    }()
    log.Printf("Waiting for messages with routing key: %s", routingKey)
    <-forever
}

Consumer Code Explanation

  1. Connecting to RabbitMQ:

    • amqp.Dial("amqp://guest:guest@localhost:5672/"): Connects to RabbitMQ with the default guest credentials.
  2. Creating a Channel:

    • conn.Channel(): Creates a communication channel for consuming messages.
  3. Declaring a Temporary Queue:

    • ch.QueueDeclare: Declares a temporary queue with no name (""). The queue will be auto-deleted when the consumer disconnects.
  4. Binding the Queue:

    • ch.QueueBind: Binds the queue to the topic_logs exchange with the specified routing key pattern (like animal.dog.#). The consumer listens for messages matching the pattern.
  5. Consuming Messages:

    • ch.Consume: Starts consuming messages from the queue. The consumer acknowledges messages automatically (autoAck=true).
  6. Message Handling:

    • For each message received, the consumer logs the message content.

โœ… Key Takeaways

  • Topic Exchange is ideal for routing messages based on patterns using wildcards (* for exactly one word, and # for zero or more words).

  • It provides flexibility in routing messages to multiple consumers based on their interests, making it suitable for applications that need to handle specific categories of messages (like animal.dog.barking vs animal.cat.meowing).

  • The Producer sends messages to the exchange, and the Consumer can filter the messages it receives using routing key patterns.


0
Subscribe to my newsletter

Read articles from Shivam Dubey directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Shivam Dubey
Shivam Dubey