Practical Approach to learn memory optimization using goroutines


Introduction
After writing Tenets of Multithreading in Go: Detailed Tutorial, I felt compelled to share a problem I had to tackle during my work as a software engineer. Real-life software engineering often involves solving unique challenges, each with its own narrative.
In this article, we will analyze four different approaches to a specific problem step-by-step. We will evaluate each solution, weighing the pros and cons, to demonstrate that there is no “one-size-fits-all” approach, as the best solution depends on the circumstances at hand.
By the end, we will compare all four approaches and make an informed choice based on our case.
The problem
The problem itself is not overly complicated. We had an NFS file location where numerous CSV files were being generated by an upstream process over which we had no control.
The task was to set up a cron job that would execute at regular intervals to check for newly created files. If new files were found, the job would upload their contents into a database and subsequently move the files to an archive location. A new instance of the cron job would be launched for each file detected. The requirement was to create a Go program to perform the cron job tasks.
Code Walkthrough
To test the various approaches, I created an API application in Go. Here is the repository. When you run the application for the first time, it creates a sample CSV file with 10 million records (approximately 0.5 GB).
The API provides four endpoints:
GET /solution-one
GET /solution-two
GET /solution-three
GET /solution-four
All of these endpoints perform the same tasks:
Read all lines from the file.
Convert to read entities.
Transform into write entities.
Insert the write entities into memory.
The application uses GORM as an ORM to facilitate easy bulk insert functionalities, configured to insert in batches of 1000. Each endpoint returns a response that includes:
Time taken for the solution.
Memory consumed for the operation.
The database used in the application is SQLite, which allows only one connection at a time, creating a bottleneck during write operations. For databases like PostgreSQL, the readings gathered from the runs of different solutions may yield different results, especially for the multithreaded approaches.
Solution 1: Brute force
The brute force method involves the following steps:
Read all lines from the file into memory.
Convert read entities and hold them in memory.
Transform all read entities into write entities.
Insert all write entities into the database.
Here’s the straightforward code:
func LoadAllAndInsert(database *db.DB) error {
landReadModels, err := utils.ReadCSVAll("data/land_feed.csv")
if len(err) > 0 {
log.Fatal(err)
return errors.New("Some error happened. Check logs.")
}
var dbLandModels []*models.Land
for _, landReadModel := range landReadModels {
dbLandModels = append(dbLandModels, models.FromReadModel(landReadModel))
}
eror := database.CreateLands(dbLandModels)
return eror
}
The response received is as follows:
{
"error": "",
"message": "Successfully completed request for Load All And Insert In Batches.",
"memoryUsage": 3296896352,
"elapsed": 18968041000
}
Solution 2: Buffered inserts along with reading
This method utilizes buffered inserts while reading:
Read a line from the file and convert it to a read entity.
Convert the read entity into a write entity.
Push the write entity into a buffer.
If the buffer size is full, perform a bulk insert into the database; if not, continue populating the buffer.
Here’s the code:
func ReadLineAndAndInsertInBatches(database *db.DB) error {
file, err := os.Open("data/land_feed.csv")
if err != nil {
return err
}
defer file.Close()
// Create a CSV reader
reader := csv.NewReader(file)
//Read the headers
_, _ = reader.Read()
lineNumber := 1
var buffer []*models.Land
for {
record, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
readEntity, err := utils.CreateEntityFromRecord(record, lineNumber)
if err != nil {
return err
}
dbEntity := models.FromReadModel(*readEntity)
buffer = append(buffer, dbEntity)
if len(buffer) == 10000 {
database.CreateLands(buffer)
buffer = buffer[:0]
}
lineNumber++
}
// Flush any remaining entities in the buffer
if len(buffer) > 0 {
database.CreateLands(buffer)
}
return nil
}
The response received is as follows:
{
"error": "",
"message": "Successfully completed request for Read Line And Insert In Batches.",
"memoryUsage": 3851704,
"elapsed": 26142007900
}
Solution 3: Using goroutines to optimize: Worker Pool
To further optimize the process, I implemented goroutines with a worker pool approach. This method employs two channels: readChannel
for read entities and doneChannel
for signaling completion. The process consists of the following:
A Producer goroutine reads each line from the CSV file, converts it into a read entity, and sends it to the
readChannel
.Five Consumer goroutines listen to the
readChannel
, convert each read entity into a write entity, and maintain a buffer of 10,000 write items to be written to the database. When the buffer is full, it is written to the database, and the buffer is cleared.Once all worker consumers signal completion, the
doneChannel
is triggered, allowing the parent method to exit.
Here’s the implementation:
func MultiprocessingForReadingAndWriting(database *db.DB) error {
readCh := make(chan *models.LandRead, 1000)
doneCh := make(chan struct{})
errCh := make(chan error)
const numWriters = 5 // Number of writing goroutines
go readAndProduceModelAsync(readCh, errCh)
var wg sync.WaitGroup
var mutex sync.Mutex
for i := 0; i < numWriters; i++ {
wg.Add(1)
go writeAndConsumeAsync(database, readCh, errCh, &wg, &mutex)
}
go func() {
wg.Wait()
close(doneCh)
}()
for {
select {
case err := <-errCh:
return err
case <-doneCh:
return nil
}
}
}
func readAndProduceModelAsync(readCh chan<- *models.LandRead, errCh chan<- error) {
defer close(readCh)
file, err := os.Open("data/land_feed.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
// Create a CSV reader
reader := csv.NewReader(file)
//Read the headers
_, _ = reader.Read()
lineNumber := 1
for {
record, err := reader.Read()
if err != nil {
if err == io.EOF {
return
}
errCh <- err
return
}
readEntity, err := utils.CreateEntityFromRecord(record, lineNumber)
if err != nil {
errCh <- err
return
}
readCh <- readEntity
lineNumber++
}
}
func writeAndConsumeAsync(database *db.DB, readCh <-chan *models.LandRead, errCh chan<- error, wg *sync.WaitGroup, mutex *sync.Mutex) {
defer wg.Done()
var buffer []*models.Land
batchSize := 10000
for readEntity := range readCh {
dbEntity := models.FromReadModel(*readEntity)
buffer = append(buffer, dbEntity)
if len(buffer) == batchSize {
mutex.Lock()
err := database.CreateLands(buffer)
mutex.Unlock()
if err != nil {
errCh <- err
return
}
buffer = buffer[:0]
}
}
// Flush any remaining entities in the buffer
if len(buffer) > 0 {
mutex.Lock()
err := database.CreateLands(buffer)
mutex.Unlock()
if err != nil {
errCh <- err
return
}
}
}
The response received is as follows:
{
"error": "",
"message": "Successfully completed request for Multiprocessing For Reading And Writing.",
"memoryUsage": 5289472,
"elapsed": 24760674200
}
Solution 4: Further squeezing with worker pool pipeline
In this final solution, we enhance the previous design by introducing an additional type of routine to streamline the processing pipeline. The parent method establishes three channels:
recordChannel
for raw string records from the file.transformChannel
for converted write entities.doneChannel
for signaling the completion of the operation.
The approach consists of:
A Producer routine that reads lines from the CSV file and sends the split strings to the
recordChannel
.Five Transforming routines that listen to
recordChannel
, convert raw records into read entities, and publish the transformed write entities to thetransformChannel
.Five Writing routines that listen to the
transformChannel
, maintain a buffer of 10,000 write entities, and write them to the database when the buffer is filled.The
doneChannel
is signaled once all writing routines are completed, allowing the parent method to exit.
Here’s the implementation:
type Record struct {
LineNo int
Data []string
}
func MultiProcessingForReadingTransformAndWriting(database *db.DB) error {
recordCh := make(chan *Record, 1000)
transformCh := make(chan *models.Land, 1000)
doneCh := make(chan struct{})
errCh := make(chan error)
const numTransformers = 5 // Number of transformation goroutines
const numWriters = 5 // Number of writing goroutines
go readAndProduceRecords(recordCh, errCh)
var wg sync.WaitGroup
for i := 0; i < numTransformers; i++ {
wg.Add(1)
go transformAndProduceDbModel(recordCh, transformCh, errCh, &wg)
}
var writeWg sync.WaitGroup
var mutex sync.Mutex
for i := 0; i < numWriters; i++ {
writeWg.Add(1)
go writeAndConsumeDbModel(database, transformCh, errCh, &writeWg, &mutex)
}
go func() {
wg.Wait()
close(transformCh) // Close transformCh once all transform goroutines finish
}()
go func() {
writeWg.Wait()
close(doneCh)
}()
for {
select {
case err := <-errCh:
return err
case <-doneCh:
return nil
}
}
}
func readAndProduceRecords(recordCh chan<- *Record, errCh chan<- error) {
defer close(recordCh)
file, err := os.Open("data/land_feed.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := csv.NewReader(file)
_, _ = reader.Read() // Read the headers
lineNumber := 1
for {
record, err := reader.Read()
if err != nil {
if err == io.EOF {
return
}
errCh <- err
return
}
recordCh <- &Record{
LineNo: lineNumber,
Data: record,
}
lineNumber++
}
}
func transformAndProduceDbModel(recordCh <-chan *Record, transformCh chan<- *models.Land, errCh chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
for record := range recordCh {
readEntity, err := utils.CreateEntityFromRecord(record.Data, record.LineNo)
if err != nil {
errCh <- err
return
}
dbEntity := models.FromReadModel(*readEntity)
transformCh <- dbEntity
}
}
func writeAndConsumeDbModel(database *db.DB, transformCh <-chan *models.Land, errCh chan<- error, wg *sync.WaitGroup, mutex *sync.Mutex) {
defer wg.Done()
var buffer []*models.Land
batchSize := 10000
for dbEntity := range transformCh {
buffer = append(buffer, dbEntity)
if len(buffer) == batchSize {
mutex.Lock()
err := database.CreateLands(buffer)
mutex.Unlock()
if err != nil {
errCh <- err
return
}
buffer = buffer[:0]
}
}
// Flush any remaining entities in the buffer
if len(buffer) > 0 {
mutex.Lock()
err := database.CreateLands(buffer)
mutex.Unlock()
if err != nil {
errCh <- err
return
}
}
}
The response received was as follows:
{
"error": "",
"message": "Successfully completed request for Multiprocessing For Reading, Transform And Writing.",
"memoryUsage": 4515240,
"elapsed": 23637820200
}
Conclusion
The following table provides a comparative view of all four approaches:
Endpoint | Description | Memory Usage (bytes) | Elapsed Time (ns) |
GET /solution-one | Load all data and insert in batches. | 3,296,896,352 | 18,968,041,000 |
GET /solution-two | Read line by line and insert in batches of 10,000. | 3,851,704 | 26,142,007,900 |
GET /solution-three | Use multiprocessing with a worker pool for reading and writing. | 5,289,472 | 24,760,674,200 |
GET /solution-four | Pipeline approach on worker pool for reading, transforming, and writing. | 4,515,240 | 23,637,820,200 |
From the results, we can observe the following insights:
Solution One is the fastest but consumes a disproportionate amount of memory as it loads everything into memory.
Solution Two uses the least memory but is the slowest.
Solution Three slightly improves time over Solution Two while drastically reducing memory usage compared to Solution One.
Solution Four further improves memory usage (second best overall) and execution time (also second best).
In many cloud environments, prolonged computational costs can lead to overshooting budgets. Thus, Solution Four stands out as the optimal choice for our scenario. While it may not be the absolute best for speed or memory usage, it strikes a balance, ranking second best in both metrics. This reflects the utility of multithreading: while it doesn’t always deliver the best results outright, it often yields the best overall performance across multiple criteria.
In this article, we examined a real-life problem and applied our knowledge of multiprocessing to arrive at an optimal solution.
Subscribe to my newsletter
Read articles from Siddhartha S directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Siddhartha S
Siddhartha S
With over 18 years of experience in IT, I specialize in designing and building powerful, scalable solutions using a wide range of technologies like JavaScript, .NET, C#, React, Next.js, Golang, AWS, Networking, Databases, DevOps, Kubernetes, and Docker. My career has taken me through various industries, including Manufacturing and Media, but for the last 10 years, I’ve focused on delivering cutting-edge solutions in the Finance sector. As an application architect, I combine cloud expertise with a deep understanding of systems to create solutions that are not only built for today but prepared for tomorrow. My diverse technical background allows me to connect development and infrastructure seamlessly, ensuring businesses can innovate and scale effectively. I’m passionate about creating architectures that are secure, resilient, and efficient—solutions that help businesses turn ideas into reality while staying future-ready.