Building a Scalable Chat System: My Learnings with Concurrency and Async Message Processing in Go

Farhan KhojaFarhan Khoja
8 min read

The Challenge: Real-Time Chat message storage at scale

Building a production-ready chat system that handles thousands of concurrent conversations isn't just about storing messages in a database. It's about orchestrating multiple components that work together seamlessly while maintaining performance, reliability, and data consistency.

Recently, I architected and built a distributed chat system that handles real-time conversations with AI assistants, and I want to share the key insights around concurrency patterns and asynchronous message processing that made it scale.

System Architecture Overview

Core Components:

  • HTTP API Server: Handles incoming chat requests

  • AWS Knowledge Base Integration: Processes user queries against vector databases

  • SQS Message Queue: Decouples message processing from API responses

  • Pusher: Realtime communication channel between User and Server

  • Worker Pool: Concurrent message processors

  • MySQL Database: Persistent storage with optimized schemas

  • Real-time Events: WebSocket-like real-time updates

Data Flow:

  1. User sends a message via HTTP API

  2. System queries AWS Knowledge Base for AI response leveraged via AWS Bedrock and receives the response from LLMs

  3. Message pair (user + LLM response) gets queued in SQS

  4. Worker pool processes messages asynchronously

  5. Messages are persisted to database with proper transactions

  6. Real-time events notify connected clients via Pusher

The Concurrency Challenge

The biggest challenge was handling the dual nature of chat systems:

  • Synchronous: Users expect immediate responses

  • Asynchronous: Message persistence can happen later

Here's how I solved it using Go's concurrency primitives.

Asynchronous Message Processing with Worker Pools

The AWS Client

package aws_cloud

import (
    "context"
    "fmt"
    "log"
    "os"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/credentials"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
    "github.com/aws/smithy-go/logging"
)

type DevEnvironmentConfig struct {
    AccessKey string
    Secret    string
}

type AwsCloudClientArg struct {
    DevEnvironmentCfg  *DevEnvironmentConfig
    LogEnable          bool
    Region             string
    QueueName          string
}

type AWSCloudClients struct {
    region                    string
    sqsClient                 *sqs.Client
    queueUrl                  *string
}

type defaultLogger struct {
    logger *log.Logger
}

/*
    Logf implements the logging.Logger interface required by AWS SDK for Go V2.
    So when the AWS SDK wants to log something, it calls Logf on the defaultLogger, 
    which then forwards that to Go's standard logger using Printf. 
    This way we can use the familiar Go standard logger while satisfying AWS SDK's 
    logging requirements.
*/
func (l defaultLogger) Logf(classification logging.Classification, format string, args ...interface{}) {
    l.logger.Printf(format, args...)
}

func NewAWSCloudClients(
    ctx context.Context,
    arg AwsCloudClientArg,
) (*AWSCloudClients, error) {
    var sdkConfig aws.Config
    var err error
    if arg.DevEnvironmentCfg != nil {
        if arg.LogEnable {
            stdLogger := log.New(os.Stderr, "", log.LstdFlags)
            // Wrap the standard logger in our defaultLogger type.
            loggerWrapper := defaultLogger{logger: stdLogger}
            sdkConfig, err = config.LoadDefaultConfig(
                ctx,
                config.WithRegion(arg.Region),
                config.WithCredentialsProvider(
                    credentials.NewStaticCredentialsProvider(arg.DevEnvironmentCfg.AccessKey, arg.DevEnvironmentCfg.Secret, ""),
                ),
                config.WithClientLogMode(aws.LogRequestWithBody|aws.LogResponseWithBody),
                config.WithLogger(loggerWrapper),
            )
            if err != nil {
                return nil, fmt.Errorf("error loading development aws sdk configuration: %w", err)
            }
        } else {
            sdkConfig, err = config.LoadDefaultConfig(
                ctx,
                config.WithRegion(arg.Region),
                config.WithCredentialsProvider(
                    credentials.NewStaticCredentialsProvider(arg.DevEnvironmentCfg.AccessKey, arg.DevEnvironmentCfg.Secret, ""),
                ),
            )
            if err != nil {
                return nil, fmt.Errorf("error loading development aws sdk configuration: %w", err)
            }
        }
    } else {
        sdkConfig, err = config.LoadDefaultConfig(
            ctx,
            config.WithRegion(arg.Region),
        )
        if err != nil {
            return nil, fmt.Errorf("error loading aws sdk configuration in production: %w", err)
        }
    }

    sqsClient, queueUrl, err := newSqsClient(ctx, sdkConfig, arg.QueueName)
    if err != nil {
        return nil, fmt.Errorf("error creating new sqs client: %w", err)
    }

    return &AWSCloudClients{
        region:                    arg.Region,
        sqsClient:                 sqsClient,
        queueUrl:                  queueUrl,
    }, nil

}

func newSqsClient(ctx context.Context, config aws.Config, queueName string) (*sqs.Client, *string, error) {

    sqsClient := *sqs.NewFromConfig(config)

    result, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
        QueueName: &queueName,
    })

    if err != nil {
        return nil, nil, err
    }

    return &sqsClient, result.QueueUrl, nil
}

SQS Operations

package aws_cloud

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
    "github.com/swagflo/chatbot_engine/internal/types"
)


type MessageRole string

const (
    MessageRoleUser       MessageRole = "USER"
    MessageRoleAssistant  MessageRole = "LLM"
    MessageRoleHumanAgent MessageRole = "HUMAN-AGENT"
)

type RoleBasedMessage struct {
    Role      MessageRole `json:"role"`
    Content   string      `json:"content"`
    Timestamp time.Time   `json:"timestamp"`
}


type SqsMessageStructure struct {
    WorkspaceID       uint                    `json:"workspace_id"`
    ConversationID    int64                   `json:"conversation_id"`
    StoreId           int                     `json:"store_id"`
    UserMessage       types.RoleBasedMessage  `json:"user_message"`
    AssistantMessage  types.RoleBasedMessage  `json:"assistant_message"`
    HumanAgentMessage *types.RoleBasedMessage `json:"human_agent_message"`
    HandoffMessages   bool                    `json:"handoff_messages"`
    SessionId         *string                 `json:"session_id"`
    ReceiptHandle     *string                 `json:"receipt_handle"`
}

func (awsClient *AWSCloudClients) SendMessage(
    ctx context.Context,
    message SqsMessageStructure,
) error {

    jsonMessage, err := json.Marshal(message)
    if err != nil {
        return fmt.Errorf("error marshalling message to json: %w", err)
    }

    _, err = awsClient.sqsClient.SendMessage(ctx, &sqs.SendMessageInput{
        QueueUrl:    awsClient.queueUrl,
        MessageBody: aws.String(string(jsonMessage)),
    })
    if err != nil {
        return fmt.Errorf("error sending message: %w", err)
    }

    return nil
}

func (awsClient *AWSCloudClients) ReceiveMessage(
    ctx context.Context,
) ([]SqsMessageStructure, error) {

    var messages []SqsMessageStructure

    receiveMessageParams := sqs.ReceiveMessageInput{
        QueueUrl:          awsClient.queueUrl,
        WaitTimeSeconds:   *aws.Int32(10),
        VisibilityTimeout: *aws.Int32(60),
    }

    messageOutput, err := awsClient.sqsClient.ReceiveMessage(ctx, &receiveMessageParams)
    if err != nil {
        return nil, fmt.Errorf("error receiving message: %w", err)
    }

    if len(messageOutput.Messages) != 0 {
        for _, item := range messageOutput.Messages {
            var message SqsMessageStructure
            err := json.Unmarshal([]byte(*item.Body), &message)
            if err != nil {
                return nil, fmt.Errorf("error unmarshalling the message: %w", err)
            }
            message.ReceiptHandle = item.ReceiptHandle
            messages = append(messages, message)
        }
    }

    return messages, nil
}

func (awsClient *AWSCloudClients) DeleteMessage(
    ctx context.Context,
    receiptHandle *string,
) error {
    _, err := awsClient.sqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{
        QueueUrl:      awsClient.queueUrl,
        ReceiptHandle: receiptHandle,
    })
    if err != nil {
        return err
    }

    return nil
}

The Consumer Architecture

type ConsumerOp struct {
    awsClient  *aws_cloud.AWSCloudClients
    store      database.Store // We are leveraging SQLC for managing database operations
    baseLogger *logger.Logger
}

func NewConsumer(
    awsClient *aws_cloud.AWSCloudClients,
    store database.Store,
    baseLogger *logger.Logger,
) *ConsumerOp {
    return &ConsumerOp{
        awsClient:  awsClient,
        store:      store,
        baseLogger: baseLogger,
    }
}

func (cop *ConsumerOp) ProcessMessage(ctx context.Context, numWorkers int) error {
    // Buffered channel acts as a job queue
    jobs := make(chan aws_cloud.SqsMessageStructure, numWorkers*2)

    var wg sync.WaitGroup

    // Spawn multiple workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            cop.worker(ctx, id, jobs)
        }(i)
    }

    // Message polling goroutine
    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            default:
                messages, err := cop.awsClient.ReceiveMessage(ctx)
                if err != nil {
                    cop.baseLogger.Error().Err(err).Msg("error polling SQS")
                    continue
                }

                // Feed jobs to workers
                for _, msg := range messages {
                    jobs <- msg
                }
            }
        }
    }()

    // Graceful shutdown
    <-ctx.Done()
    close(jobs) // Closing the buffered channel
    wg.Wait() // Waiting for goroutines to complete
    return nil
}

Worker Implementation

func (cop *ConsumerOp) worker(ctx context.Context, id int, jobs <-chan aws_cloud.SqsMessageStructure) {
    workerLogger := cop.baseLogger.With().Int("worker_id", id).Logger()

    for {
        select {
        case <-ctx.Done():
            workerLogger.Info().Msg("Worker shutting down")
            return
        case msg, ok := <-jobs:
            if !ok {
                return // Channel closed
            }

            // Process message atomically
            err := cop.processSingleMessage(ctx, msg)
            if err != nil {
                workerLogger.Error().Err(err).Msg("failed to process message")
                continue
            }

            // Only delete from queue if processing succeeded
            deleteErr := cop.awsClient.DeleteMessage(ctx, msg.ReceiptHandle)
            if deleteErr != nil {
                workerLogger.Error().Err(deleteErr).Msg("CRITICAL: Message processed but failed to delete")
            }
        }
    }
}

Processing Individual Messages

func (cop *ConsumerOp) processSingleMessage(ctx context.Context, message aws_cloud.SqsMessageStructure) error {
    if message.HandoffMessages {
        // Logic written here basically writes to database
        args := database.CreateMessageParams{
            ConversationID: uint64(message.ConversationID),
        }
        if message.HumanAgentMessage != nil {
            args.Role = database.AwsMessagesRoleHUMANAGENT
            args.Content = message.HumanAgentMessage.Content
            args.CreatedAt = message.HumanAgentMessage.Timestamp
        } else {
            args.Role = database.AwsMessagesRoleUSER
            args.Content = message.UserMessage.Content
            args.CreatedAt = message.UserMessage.Timestamp
        }
        _, err := cop.store.CreateMessage(ctx, args)
        cop.baseLogger.Error().Err(err).Msg("failed to persist handoff message")
        return err
    } else {
        // PersistContinuedConversation basically writes messages to database in transaction
        err := cop.store.PersistContinuedConversation(
            ctx,
            database.PersistContinuedConversationArgs{
                WorkspaceId:      int(message.WorkspaceID),
                ConversationId:   int(message.ConversationID),
                UserMessage:      message.UserMessage,
                AssistantMessage: message.AssistantMessage,
                SessionId:        message.SessionId,
            },
        )
        if err != nil {
            if errors.Is(err, custom_errors.ErrDuplicatedKey) {
                cop.baseLogger.Error().Err(err).Msg("record already exists")
                return nil
            }
            cop.baseLogger.Error().Err(err).Msg("failed to persist continued conversation messages")
            return err
        }
    }

    return nil
}

Graceful Shutdown with Context

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    group, groupCtx := errgroup.WithContext(ctx)

    // HTTP Server
    group.Go(func() error {
        err := srv.ListenAndServe()
        if err != nil && !errors.Is(err, http.ErrServerClosed) {
            return err
        }
        return nil
    })

    // Message Consumer
    group.Go(func() error {
        return consumer.ProcessMessage(groupCtx, 2) // 2 workers
    })

    // Graceful shutdown coordinator
    group.Go(func() error {
        <-groupCtx.Done()

        shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
        defer cancel()

        return srv.Shutdown(shutdownCtx)
    })

    // Wait for all goroutines
    if err := group.Wait(); err != nil && !errors.Is(err, context.Canceled) {
        log.Fatal(err)
    }
}

Note: I have not provided the complete code this are snippets from the application which are essential enough to understand the concept.

Performance Insights

Throughput Improvements:

  • Before: Sequential processing ~100 messages/second

  • After: Concurrent processing ~2,000+ messages/second

Key Optimizations:

  1. Buffered Channels: Prevent goroutine blocking

  2. Worker Pools: Limit concurrent database connections

  3. Async Queuing: Decouple API response from persistence

  4. Batch Processing: Process multiple SQS messages together

Lessons Learned

1. Channel Sizing Matters

// Too small: goroutines block
jobs := make(chan Message, 1)

// Too large: memory waste
jobs := make(chan Message, 10000)

// Just right: 2x worker count
jobs := make(chan Message, numWorkers*2)

2. Error Handling is Critical

Never delete a message from the queue unless you're 100% sure it's been processed successfully.

3. Context Propagation

Always pass context through your entire call stack for proper cancellation.

Architectural Limitations: The Reality Check

While this system handles thousands to hundreds of thousands of concurrent conversations efficiently, it has fundamental limitations that become apparent at millions to billions of messages scale. Let me be transparent about these constraints:

1. Tightly Coupled Consumer-Server Architecture

The biggest limitation is that the consumer and HTTP server are co-located in the same process.

func main() {
    // Both running in same process - can't scale independently
    group.Go(func() error {
        return srv.ListenAndServe() // HTTP Server
    })

    group.Go(func() error {
        return consumer.ProcessMessage(groupCtx, 2) // Consumer
    })
}

Problems this creates:

  • Single Point of Failure: If the server crashes, message processing stops

  • Resource Contention: HTTP handling and message processing compete for CPU/memory

  • Inflexible Scaling: Can't add more consumers without adding more API servers

2. The Horizontal Scaling Paradox

Even though we "decoupled" with SQS, we're still architecturally coupled

// This design prevents true horizontal scaling
type Server struct {
    store      database.Store     // Shared database dependency
    awsClient  *aws_cloud.Client  // Shared AWS resources
    consumer   *consumer.Consumer // Embedded consumer
}

Why this fails at millions of messages:

  • Database Sharding: Can't easily partition data across multiple DBs

  • Consumer Distribution: All consumers compete for the same SQS queue

  • State Management: Session management becomes complex across instances

What This Architecture is Good For

Sweet Spot: 10K - 500K concurrent conversations

  • Startups and medium-scale applications

  • Internal tools and enterprise chat systems

  • Prototyping and MVP development

  • Systems where operational simplicity > infinite scale

Key Takeways

Building scalable chat systems requires thinking asynchronously by default. The patterns I've shared here - worker pools, message queues, and graceful shutdown - are foundational for any high-throughput Go application.

The combination of Go's excellent concurrency primitives with cloud-native message queuing creates a robust foundation that can handle real-world chat workloads while maintaining consistency and reliability.

0
Subscribe to my newsletter

Read articles from Farhan Khoja directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Farhan Khoja
Farhan Khoja