Building a Scalable Chat System: My Learnings with Concurrency and Async Message Processing in Go

The Challenge: Real-Time Chat message storage at scale
Building a production-ready chat system that handles thousands of concurrent conversations isn't just about storing messages in a database. It's about orchestrating multiple components that work together seamlessly while maintaining performance, reliability, and data consistency.
Recently, I architected and built a distributed chat system that handles real-time conversations with AI assistants, and I want to share the key insights around concurrency patterns and asynchronous message processing that made it scale.
System Architecture Overview
Core Components:
HTTP API Server: Handles incoming chat requests
AWS Knowledge Base Integration: Processes user queries against vector databases
SQS Message Queue: Decouples message processing from API responses
Pusher: Realtime communication channel between User and Server
Worker Pool: Concurrent message processors
MySQL Database: Persistent storage with optimized schemas
Real-time Events: WebSocket-like real-time updates
Data Flow:
User sends a message via HTTP API
System queries AWS Knowledge Base for AI response leveraged via AWS Bedrock and receives the response from LLMs
Message pair (user + LLM response) gets queued in SQS
Worker pool processes messages asynchronously
Messages are persisted to database with proper transactions
Real-time events notify connected clients via Pusher
The Concurrency Challenge
The biggest challenge was handling the dual nature of chat systems:
Synchronous: Users expect immediate responses
Asynchronous: Message persistence can happen later
Here's how I solved it using Go's concurrency primitives.
Asynchronous Message Processing with Worker Pools
The AWS Client
package aws_cloud
import (
"context"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go/logging"
)
type DevEnvironmentConfig struct {
AccessKey string
Secret string
}
type AwsCloudClientArg struct {
DevEnvironmentCfg *DevEnvironmentConfig
LogEnable bool
Region string
QueueName string
}
type AWSCloudClients struct {
region string
sqsClient *sqs.Client
queueUrl *string
}
type defaultLogger struct {
logger *log.Logger
}
/*
Logf implements the logging.Logger interface required by AWS SDK for Go V2.
So when the AWS SDK wants to log something, it calls Logf on the defaultLogger,
which then forwards that to Go's standard logger using Printf.
This way we can use the familiar Go standard logger while satisfying AWS SDK's
logging requirements.
*/
func (l defaultLogger) Logf(classification logging.Classification, format string, args ...interface{}) {
l.logger.Printf(format, args...)
}
func NewAWSCloudClients(
ctx context.Context,
arg AwsCloudClientArg,
) (*AWSCloudClients, error) {
var sdkConfig aws.Config
var err error
if arg.DevEnvironmentCfg != nil {
if arg.LogEnable {
stdLogger := log.New(os.Stderr, "", log.LstdFlags)
// Wrap the standard logger in our defaultLogger type.
loggerWrapper := defaultLogger{logger: stdLogger}
sdkConfig, err = config.LoadDefaultConfig(
ctx,
config.WithRegion(arg.Region),
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(arg.DevEnvironmentCfg.AccessKey, arg.DevEnvironmentCfg.Secret, ""),
),
config.WithClientLogMode(aws.LogRequestWithBody|aws.LogResponseWithBody),
config.WithLogger(loggerWrapper),
)
if err != nil {
return nil, fmt.Errorf("error loading development aws sdk configuration: %w", err)
}
} else {
sdkConfig, err = config.LoadDefaultConfig(
ctx,
config.WithRegion(arg.Region),
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(arg.DevEnvironmentCfg.AccessKey, arg.DevEnvironmentCfg.Secret, ""),
),
)
if err != nil {
return nil, fmt.Errorf("error loading development aws sdk configuration: %w", err)
}
}
} else {
sdkConfig, err = config.LoadDefaultConfig(
ctx,
config.WithRegion(arg.Region),
)
if err != nil {
return nil, fmt.Errorf("error loading aws sdk configuration in production: %w", err)
}
}
sqsClient, queueUrl, err := newSqsClient(ctx, sdkConfig, arg.QueueName)
if err != nil {
return nil, fmt.Errorf("error creating new sqs client: %w", err)
}
return &AWSCloudClients{
region: arg.Region,
sqsClient: sqsClient,
queueUrl: queueUrl,
}, nil
}
func newSqsClient(ctx context.Context, config aws.Config, queueName string) (*sqs.Client, *string, error) {
sqsClient := *sqs.NewFromConfig(config)
result, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: &queueName,
})
if err != nil {
return nil, nil, err
}
return &sqsClient, result.QueueUrl, nil
}
SQS Operations
package aws_cloud
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/swagflo/chatbot_engine/internal/types"
)
type MessageRole string
const (
MessageRoleUser MessageRole = "USER"
MessageRoleAssistant MessageRole = "LLM"
MessageRoleHumanAgent MessageRole = "HUMAN-AGENT"
)
type RoleBasedMessage struct {
Role MessageRole `json:"role"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
}
type SqsMessageStructure struct {
WorkspaceID uint `json:"workspace_id"`
ConversationID int64 `json:"conversation_id"`
StoreId int `json:"store_id"`
UserMessage types.RoleBasedMessage `json:"user_message"`
AssistantMessage types.RoleBasedMessage `json:"assistant_message"`
HumanAgentMessage *types.RoleBasedMessage `json:"human_agent_message"`
HandoffMessages bool `json:"handoff_messages"`
SessionId *string `json:"session_id"`
ReceiptHandle *string `json:"receipt_handle"`
}
func (awsClient *AWSCloudClients) SendMessage(
ctx context.Context,
message SqsMessageStructure,
) error {
jsonMessage, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("error marshalling message to json: %w", err)
}
_, err = awsClient.sqsClient.SendMessage(ctx, &sqs.SendMessageInput{
QueueUrl: awsClient.queueUrl,
MessageBody: aws.String(string(jsonMessage)),
})
if err != nil {
return fmt.Errorf("error sending message: %w", err)
}
return nil
}
func (awsClient *AWSCloudClients) ReceiveMessage(
ctx context.Context,
) ([]SqsMessageStructure, error) {
var messages []SqsMessageStructure
receiveMessageParams := sqs.ReceiveMessageInput{
QueueUrl: awsClient.queueUrl,
WaitTimeSeconds: *aws.Int32(10),
VisibilityTimeout: *aws.Int32(60),
}
messageOutput, err := awsClient.sqsClient.ReceiveMessage(ctx, &receiveMessageParams)
if err != nil {
return nil, fmt.Errorf("error receiving message: %w", err)
}
if len(messageOutput.Messages) != 0 {
for _, item := range messageOutput.Messages {
var message SqsMessageStructure
err := json.Unmarshal([]byte(*item.Body), &message)
if err != nil {
return nil, fmt.Errorf("error unmarshalling the message: %w", err)
}
message.ReceiptHandle = item.ReceiptHandle
messages = append(messages, message)
}
}
return messages, nil
}
func (awsClient *AWSCloudClients) DeleteMessage(
ctx context.Context,
receiptHandle *string,
) error {
_, err := awsClient.sqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: awsClient.queueUrl,
ReceiptHandle: receiptHandle,
})
if err != nil {
return err
}
return nil
}
The Consumer Architecture
type ConsumerOp struct {
awsClient *aws_cloud.AWSCloudClients
store database.Store // We are leveraging SQLC for managing database operations
baseLogger *logger.Logger
}
func NewConsumer(
awsClient *aws_cloud.AWSCloudClients,
store database.Store,
baseLogger *logger.Logger,
) *ConsumerOp {
return &ConsumerOp{
awsClient: awsClient,
store: store,
baseLogger: baseLogger,
}
}
func (cop *ConsumerOp) ProcessMessage(ctx context.Context, numWorkers int) error {
// Buffered channel acts as a job queue
jobs := make(chan aws_cloud.SqsMessageStructure, numWorkers*2)
var wg sync.WaitGroup
// Spawn multiple workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
cop.worker(ctx, id, jobs)
}(i)
}
// Message polling goroutine
go func() {
for {
select {
case <-ctx.Done():
return
default:
messages, err := cop.awsClient.ReceiveMessage(ctx)
if err != nil {
cop.baseLogger.Error().Err(err).Msg("error polling SQS")
continue
}
// Feed jobs to workers
for _, msg := range messages {
jobs <- msg
}
}
}
}()
// Graceful shutdown
<-ctx.Done()
close(jobs) // Closing the buffered channel
wg.Wait() // Waiting for goroutines to complete
return nil
}
Worker Implementation
func (cop *ConsumerOp) worker(ctx context.Context, id int, jobs <-chan aws_cloud.SqsMessageStructure) {
workerLogger := cop.baseLogger.With().Int("worker_id", id).Logger()
for {
select {
case <-ctx.Done():
workerLogger.Info().Msg("Worker shutting down")
return
case msg, ok := <-jobs:
if !ok {
return // Channel closed
}
// Process message atomically
err := cop.processSingleMessage(ctx, msg)
if err != nil {
workerLogger.Error().Err(err).Msg("failed to process message")
continue
}
// Only delete from queue if processing succeeded
deleteErr := cop.awsClient.DeleteMessage(ctx, msg.ReceiptHandle)
if deleteErr != nil {
workerLogger.Error().Err(deleteErr).Msg("CRITICAL: Message processed but failed to delete")
}
}
}
}
Processing Individual Messages
func (cop *ConsumerOp) processSingleMessage(ctx context.Context, message aws_cloud.SqsMessageStructure) error {
if message.HandoffMessages {
// Logic written here basically writes to database
args := database.CreateMessageParams{
ConversationID: uint64(message.ConversationID),
}
if message.HumanAgentMessage != nil {
args.Role = database.AwsMessagesRoleHUMANAGENT
args.Content = message.HumanAgentMessage.Content
args.CreatedAt = message.HumanAgentMessage.Timestamp
} else {
args.Role = database.AwsMessagesRoleUSER
args.Content = message.UserMessage.Content
args.CreatedAt = message.UserMessage.Timestamp
}
_, err := cop.store.CreateMessage(ctx, args)
cop.baseLogger.Error().Err(err).Msg("failed to persist handoff message")
return err
} else {
// PersistContinuedConversation basically writes messages to database in transaction
err := cop.store.PersistContinuedConversation(
ctx,
database.PersistContinuedConversationArgs{
WorkspaceId: int(message.WorkspaceID),
ConversationId: int(message.ConversationID),
UserMessage: message.UserMessage,
AssistantMessage: message.AssistantMessage,
SessionId: message.SessionId,
},
)
if err != nil {
if errors.Is(err, custom_errors.ErrDuplicatedKey) {
cop.baseLogger.Error().Err(err).Msg("record already exists")
return nil
}
cop.baseLogger.Error().Err(err).Msg("failed to persist continued conversation messages")
return err
}
}
return nil
}
Graceful Shutdown with Context
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
group, groupCtx := errgroup.WithContext(ctx)
// HTTP Server
group.Go(func() error {
err := srv.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})
// Message Consumer
group.Go(func() error {
return consumer.ProcessMessage(groupCtx, 2) // 2 workers
})
// Graceful shutdown coordinator
group.Go(func() error {
<-groupCtx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
return srv.Shutdown(shutdownCtx)
})
// Wait for all goroutines
if err := group.Wait(); err != nil && !errors.Is(err, context.Canceled) {
log.Fatal(err)
}
}
Note: I have not provided the complete code this are snippets from the application which are essential enough to understand the concept.
Performance Insights
Throughput Improvements:
Before: Sequential processing ~100 messages/second
After: Concurrent processing ~2,000+ messages/second
Key Optimizations:
Buffered Channels: Prevent goroutine blocking
Worker Pools: Limit concurrent database connections
Async Queuing: Decouple API response from persistence
Batch Processing: Process multiple SQS messages together
Lessons Learned
1. Channel Sizing Matters
// Too small: goroutines block
jobs := make(chan Message, 1)
// Too large: memory waste
jobs := make(chan Message, 10000)
// Just right: 2x worker count
jobs := make(chan Message, numWorkers*2)
2. Error Handling is Critical
Never delete a message from the queue unless you're 100% sure it's been processed successfully.
3. Context Propagation
Always pass context through your entire call stack for proper cancellation.
Architectural Limitations: The Reality Check
While this system handles thousands to hundreds of thousands of concurrent conversations efficiently, it has fundamental limitations that become apparent at millions to billions of messages scale. Let me be transparent about these constraints:
1. Tightly Coupled Consumer-Server Architecture
The biggest limitation is that the consumer and HTTP server are co-located in the same process.
func main() {
// Both running in same process - can't scale independently
group.Go(func() error {
return srv.ListenAndServe() // HTTP Server
})
group.Go(func() error {
return consumer.ProcessMessage(groupCtx, 2) // Consumer
})
}
Problems this creates:
Single Point of Failure: If the server crashes, message processing stops
Resource Contention: HTTP handling and message processing compete for CPU/memory
Inflexible Scaling: Can't add more consumers without adding more API servers
2. The Horizontal Scaling Paradox
Even though we "decoupled" with SQS, we're still architecturally coupled
// This design prevents true horizontal scaling
type Server struct {
store database.Store // Shared database dependency
awsClient *aws_cloud.Client // Shared AWS resources
consumer *consumer.Consumer // Embedded consumer
}
Why this fails at millions of messages:
Database Sharding: Can't easily partition data across multiple DBs
Consumer Distribution: All consumers compete for the same SQS queue
State Management: Session management becomes complex across instances
What This Architecture is Good For
Sweet Spot: 10K - 500K concurrent conversations
Startups and medium-scale applications
Internal tools and enterprise chat systems
Prototyping and MVP development
Systems where operational simplicity > infinite scale
Key Takeways
Building scalable chat systems requires thinking asynchronously by default. The patterns I've shared here - worker pools, message queues, and graceful shutdown - are foundational for any high-throughput Go application.
The combination of Go's excellent concurrency primitives with cloud-native message queuing creates a robust foundation that can handle real-world chat workloads while maintaining consistency and reliability.
Subscribe to my newsletter
Read articles from Farhan Khoja directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
