Practical Approach to learn memory optimization using goroutines

Siddhartha SSiddhartha S
9 min read

Introduction

After writing Tenets of Multithreading in Go: Detailed Tutorial, I felt compelled to share a problem I had to tackle during my work as a software engineer. Real-life software engineering often involves solving unique challenges, each with its own narrative.

In this article, we will analyze four different approaches to a specific problem step-by-step. We will evaluate each solution, weighing the pros and cons, to demonstrate that there is no “one-size-fits-all” approach, as the best solution depends on the circumstances at hand.

By the end, we will compare all four approaches and make an informed choice based on our case.

The problem

The problem itself is not overly complicated. We had an NFS file location where numerous CSV files were being generated by an upstream process over which we had no control.

The task was to set up a cron job that would execute at regular intervals to check for newly created files. If new files were found, the job would upload their contents into a database and subsequently move the files to an archive location. A new instance of the cron job would be launched for each file detected. The requirement was to create a Go program to perform the cron job tasks.

Code Walkthrough

To test the various approaches, I created an API application in Go. Here is the repository. When you run the application for the first time, it creates a sample CSV file with 10 million records (approximately 0.5 GB).

The API provides four endpoints:

  • GET /solution-one

  • GET /solution-two

  • GET /solution-three

  • GET /solution-four

All of these endpoints perform the same tasks:

  1. Read all lines from the file.

  2. Convert to read entities.

  3. Transform into write entities.

  4. Insert the write entities into memory.

The application uses GORM as an ORM to facilitate easy bulk insert functionalities, configured to insert in batches of 1000. Each endpoint returns a response that includes:

  • Time taken for the solution.

  • Memory consumed for the operation.

The database used in the application is SQLite, which allows only one connection at a time, creating a bottleneck during write operations. For databases like PostgreSQL, the readings gathered from the runs of different solutions may yield different results, especially for the multithreaded approaches.

Solution 1: Brute force

The brute force method involves the following steps:

  1. Read all lines from the file into memory.

  2. Convert read entities and hold them in memory.

  3. Transform all read entities into write entities.

  4. Insert all write entities into the database.

Here’s the straightforward code:

func LoadAllAndInsert(database *db.DB) error {
    landReadModels, err := utils.ReadCSVAll("data/land_feed.csv")
    if len(err) > 0 {
        log.Fatal(err)
        return errors.New("Some error happened. Check logs.")
    }


    var dbLandModels []*models.Land


    for _, landReadModel := range landReadModels {
        dbLandModels = append(dbLandModels, models.FromReadModel(landReadModel))
    }


    eror := database.CreateLands(dbLandModels)
    return eror
}

The response received is as follows:

{
    "error": "",
    "message": "Successfully completed request for Load All And Insert In Batches.",
    "memoryUsage": 3296896352,
    "elapsed": 18968041000
}

Solution 2: Buffered inserts along with reading

This method utilizes buffered inserts while reading:

  1. Read a line from the file and convert it to a read entity.

  2. Convert the read entity into a write entity.

  3. Push the write entity into a buffer.

  4. If the buffer size is full, perform a bulk insert into the database; if not, continue populating the buffer.

Here’s the code:

func ReadLineAndAndInsertInBatches(database *db.DB) error {
    file, err := os.Open("data/land_feed.csv")
    if err != nil {
        return err
    }
    defer file.Close()
    // Create a CSV reader
    reader := csv.NewReader(file)

    //Read the headers
    _, _ = reader.Read()
    lineNumber := 1
    var buffer []*models.Land

    for {
        record, err := reader.Read()
        if err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        readEntity, err := utils.CreateEntityFromRecord(record, lineNumber)
        if err != nil {
            return err
        }
        dbEntity := models.FromReadModel(*readEntity)
        buffer = append(buffer, dbEntity)
        if len(buffer) == 10000 {
            database.CreateLands(buffer)
            buffer = buffer[:0]
        }
        lineNumber++
    }
    // Flush any remaining entities in the buffer
    if len(buffer) > 0 {
        database.CreateLands(buffer)
    }
    return nil
}

The response received is as follows:

{
    "error": "",
    "message": "Successfully completed request for Read Line And Insert In Batches.",
    "memoryUsage": 3851704,
    "elapsed": 26142007900
}

Solution 3: Using goroutines to optimize: Worker Pool

To further optimize the process, I implemented goroutines with a worker pool approach. This method employs two channels: readChannel for read entities and doneChannel for signaling completion. The process consists of the following:

  1. A Producer goroutine reads each line from the CSV file, converts it into a read entity, and sends it to the readChannel.

  2. Five Consumer goroutines listen to the readChannel, convert each read entity into a write entity, and maintain a buffer of 10,000 write items to be written to the database. When the buffer is full, it is written to the database, and the buffer is cleared.

  3. Once all worker consumers signal completion, the doneChannel is triggered, allowing the parent method to exit.

Here’s the implementation:

func MultiprocessingForReadingAndWriting(database *db.DB) error {

    readCh := make(chan *models.LandRead, 1000)
    doneCh := make(chan struct{})
    errCh := make(chan error)
    const numWriters = 5 // Number of writing goroutines
    go readAndProduceModelAsync(readCh, errCh)

    var wg sync.WaitGroup
    var mutex sync.Mutex

    for i := 0; i < numWriters; i++ {
        wg.Add(1)
        go writeAndConsumeAsync(database, readCh, errCh, &wg, &mutex)
    }

    go func() {
        wg.Wait()
        close(doneCh)
    }()

    for {
        select {
        case err := <-errCh:
            return err
        case <-doneCh:
            return nil
        }
    }
}

func readAndProduceModelAsync(readCh chan<- *models.LandRead, errCh chan<- error) {

    defer close(readCh)

    file, err := os.Open("data/land_feed.csv")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
    // Create a CSV reader
    reader := csv.NewReader(file)
    //Read the headers
    _, _ = reader.Read()

    lineNumber := 1

    for {
        record, err := reader.Read()
        if err != nil {
            if err == io.EOF {
                return
            }
            errCh <- err
            return
        }

        readEntity, err := utils.CreateEntityFromRecord(record, lineNumber)
        if err != nil {
            errCh <- err
            return
        }

        readCh <- readEntity
        lineNumber++
    }
}

func writeAndConsumeAsync(database *db.DB, readCh <-chan *models.LandRead, errCh chan<- error, wg *sync.WaitGroup, mutex *sync.Mutex) {
    defer wg.Done()

    var buffer []*models.Land
    batchSize := 10000

    for readEntity := range readCh {
        dbEntity := models.FromReadModel(*readEntity)
        buffer = append(buffer, dbEntity)


        if len(buffer) == batchSize {
            mutex.Lock()
            err := database.CreateLands(buffer)
            mutex.Unlock()
            if err != nil {
                errCh <- err
                return
            }
            buffer = buffer[:0]
        }
    }
    // Flush any remaining entities in the buffer
    if len(buffer) > 0 {
        mutex.Lock()
        err := database.CreateLands(buffer)
        mutex.Unlock()
        if err != nil {
            errCh <- err
            return
        }
    }
}

The response received is as follows:

{
    "error": "",
    "message": "Successfully completed request for Multiprocessing For Reading And Writing.",
    "memoryUsage": 5289472,
    "elapsed": 24760674200
}

Solution 4: Further squeezing with worker pool pipeline

In this final solution, we enhance the previous design by introducing an additional type of routine to streamline the processing pipeline. The parent method establishes three channels:

  • recordChannel for raw string records from the file.

  • transformChannel for converted write entities.

  • doneChannel for signaling the completion of the operation.

The approach consists of:

  1. A Producer routine that reads lines from the CSV file and sends the split strings to the recordChannel.

  2. Five Transforming routines that listen to recordChannel, convert raw records into read entities, and publish the transformed write entities to the transformChannel.

  3. Five Writing routines that listen to the transformChannel, maintain a buffer of 10,000 write entities, and write them to the database when the buffer is filled.

  4. The doneChannel is signaled once all writing routines are completed, allowing the parent method to exit.

Here’s the implementation:

type Record struct {
    LineNo int
    Data   []string
}

func MultiProcessingForReadingTransformAndWriting(database *db.DB) error {
    recordCh := make(chan *Record, 1000)
    transformCh := make(chan *models.Land, 1000)
    doneCh := make(chan struct{})
    errCh := make(chan error)
    const numTransformers = 5 // Number of transformation goroutines
    const numWriters = 5      // Number of writing goroutines

    go readAndProduceRecords(recordCh, errCh)

    var wg sync.WaitGroup
    for i := 0; i < numTransformers; i++ {
        wg.Add(1)
        go transformAndProduceDbModel(recordCh, transformCh, errCh, &wg)
    }

    var writeWg sync.WaitGroup
    var mutex sync.Mutex
    for i := 0; i < numWriters; i++ {
        writeWg.Add(1)
        go writeAndConsumeDbModel(database, transformCh, errCh, &writeWg, &mutex)
    }

    go func() {
        wg.Wait()
        close(transformCh) // Close transformCh once all transform goroutines finish
    }()

    go func() {
        writeWg.Wait()
        close(doneCh)
    }()

    for {
        select {
        case err := <-errCh:
            return err
        case <-doneCh:
            return nil
        }
    }
}

func readAndProduceRecords(recordCh chan<- *Record, errCh chan<- error) {
    defer close(recordCh)
    file, err := os.Open("data/land_feed.csv")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()
    reader := csv.NewReader(file)
    _, _ = reader.Read() // Read the headers
    lineNumber := 1
    for {
        record, err := reader.Read()
        if err != nil {
            if err == io.EOF {
                return
            }
            errCh <- err
            return
        }
        recordCh <- &Record{
            LineNo: lineNumber,
            Data:   record,
        }
        lineNumber++
    }
}

func transformAndProduceDbModel(recordCh <-chan *Record, transformCh chan<- *models.Land, errCh chan<- error, wg *sync.WaitGroup) {
    defer wg.Done()
    for record := range recordCh {
        readEntity, err := utils.CreateEntityFromRecord(record.Data, record.LineNo)
        if err != nil {
            errCh <- err
            return
        }
        dbEntity := models.FromReadModel(*readEntity)
        transformCh <- dbEntity
    }
}

func writeAndConsumeDbModel(database *db.DB, transformCh <-chan *models.Land, errCh chan<- error, wg *sync.WaitGroup, mutex *sync.Mutex) {
    defer wg.Done()
    var buffer []*models.Land
    batchSize := 10000
    for dbEntity := range transformCh {
        buffer = append(buffer, dbEntity)
        if len(buffer) == batchSize {
            mutex.Lock()
            err := database.CreateLands(buffer)
            mutex.Unlock()
            if err != nil {
                errCh <- err
                return
            }
            buffer = buffer[:0]
        }
    }
    // Flush any remaining entities in the buffer
    if len(buffer) > 0 {
        mutex.Lock()
        err := database.CreateLands(buffer)
        mutex.Unlock()
        if err != nil {
            errCh <- err
            return
        }
    }
}

The response received was as follows:

{
    "error": "",
    "message": "Successfully completed request for Multiprocessing For Reading, Transform And Writing.",
    "memoryUsage": 4515240,
    "elapsed": 23637820200
}

Conclusion

The following table provides a comparative view of all four approaches:

EndpointDescriptionMemory Usage (bytes)Elapsed Time (ns)
GET /solution-oneLoad all data and insert in batches.3,296,896,35218,968,041,000
GET /solution-twoRead line by line and insert in batches of 10,000.3,851,70426,142,007,900
GET /solution-threeUse multiprocessing with a worker pool for reading and writing.5,289,47224,760,674,200
GET /solution-fourPipeline approach on worker pool for reading, transforming, and writing.4,515,24023,637,820,200

From the results, we can observe the following insights:

  • Solution One is the fastest but consumes a disproportionate amount of memory as it loads everything into memory.

  • Solution Two uses the least memory but is the slowest.

  • Solution Three slightly improves time over Solution Two while drastically reducing memory usage compared to Solution One.

  • Solution Four further improves memory usage (second best overall) and execution time (also second best).

In many cloud environments, prolonged computational costs can lead to overshooting budgets. Thus, Solution Four stands out as the optimal choice for our scenario. While it may not be the absolute best for speed or memory usage, it strikes a balance, ranking second best in both metrics. This reflects the utility of multithreading: while it doesn’t always deliver the best results outright, it often yields the best overall performance across multiple criteria.

In this article, we examined a real-life problem and applied our knowledge of multiprocessing to arrive at an optimal solution.

0
Subscribe to my newsletter

Read articles from Siddhartha S directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Siddhartha S
Siddhartha S

With over 18 years of experience in IT, I specialize in designing and building powerful, scalable solutions using a wide range of technologies like JavaScript, .NET, C#, React, Next.js, Golang, AWS, Networking, Databases, DevOps, Kubernetes, and Docker. My career has taken me through various industries, including Manufacturing and Media, but for the last 10 years, I’ve focused on delivering cutting-edge solutions in the Finance sector. As an application architect, I combine cloud expertise with a deep understanding of systems to create solutions that are not only built for today but prepared for tomorrow. My diverse technical background allows me to connect development and infrastructure seamlessly, ensuring businesses can innovate and scale effectively. I’m passionate about creating architectures that are secure, resilient, and efficient—solutions that help businesses turn ideas into reality while staying future-ready.