Golang Producer and Consumer with Kafka Simple

OpenDevOpenDev
2 min read

install package:

go get https://github.com/segmentio/kafka-go

Create Topic In Kafka:

  • create topic name logging

  • topic logging has 1 partition

Producer

  • create folder producer:

  • create file main.go in folder producer

copy and paste code:

package main

import (
    "context"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/google/uuid"
    "github.com/segmentio/kafka-go"
)

var (
    BrokerAddress = "localhost:9092"
    TopicLogging  = "logging"
)

func main() {
    Produce(context.Background())
}
func Produce(ctx context.Context) {
    i := 0

    l := log.New(os.Stdout, "kafka writer: ", 0)
    // intialize the writer with the broker addresses, and the topic
    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{BrokerAddress},
        Topic:   TopicLogging,
        // assign the logger to the writer
        Logger: l,
        Async:  false,
    })

    for {
        writeMsg(ctx, w, i)
        time.Sleep(time.Millisecond)
    }
}

func writeMsg(ctx context.Context, w *kafka.Writer, i int) {
    msgs := kafka.Message{
        Key:   []byte(strconv.Itoa(i)),
        Value: []byte("timeNow:" + time.Now().Format(time.RFC3339Nano) + " " + "uuid:" + uuid.New().String()),
    }
    err := w.WriteMessages(ctx, msgs)
    if err != nil {
        panic("could not write message " + err.Error())
    }

}

Consumer

  • create folder consumer

  • create file main.go in folder consumer

copy and paste code

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/segmentio/kafka-go"
)

const (
    topic         = "topic"
    TopicLogging  = "logging"
    BrokerAddress = "localhost:9092"
    Group         = "logging-consumer-group-1"
)

func main() {
    s := make(chan bool)
    go Consume(context.Background(), TopicLogging, 0)
    <-s
}

func Consume(ctx context.Context, topic string, pa int) {
    // create a new logger that outputs to stdout
    // and has the `kafka reader` prefix
    l := log.New(os.Stdout, "kafka reader: ", 0)
    dialer := &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
    }

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{BrokerAddress},
        Topic:   topic,
        GroupID: Group,
        // assign the logger to the reader
        Logger: l,
        Dialer: dialer,
    })
    for {

        m, err := r.FetchMessage(ctx)

        if err != nil {
            break
        }
        fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
        if err := r.CommitMessages(ctx, m); err != nil {
            log.Fatal("failed to commit messages:", err)
        }
    }
}

Test

  • push message to Kafka:

      go run producer/main.go
    

  • consume message from Kafka:

      go run consumer/main.go
    

source code: https://github.com/ducnpdev/open-dev/tree/master/kafka

0
Subscribe to my newsletter

Read articles from OpenDev directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

OpenDev
OpenDev

I am a developer from Vietnamese's