Producer-Consumer Pattern

Abdullah KhairyAbdullah Khairy
7 min read

The producer-consumer pattern is a fundamental synchronization pattern in multithreaded programming. In this article, we'll dive deep into implementing this pattern in C++ using low-level synchronization primitives - specifically mutex and condition variables. Rather than relying on pre-built thread-safe queues, we'll build our solution from the ground up to understand how these mechanisms work.

What is the Producer-Consumer Pattern?

The producer-consumer pattern involves two types of entities:

  1. Producers: Threads that generate data or tasks

  2. Consumers: Threads that process the data or tasks generated by producers

These entities communicate through a shared buffer or queue. The pattern allows for decoupling data production from its consumption, enabling parallel processing and improved throughput.

Real life analgy

a restaurant kitchen and dining area.

  • Producers (Chefs in the Kitchen): The chefs prepare food (data) and place it on a counter or serving area (a shared buffer). They keep cooking as long as there’s space on the counter.

  • Consumers (Waiters or Customers): The waiters pick up the prepared food from the counter and serve it to customers, or customers take the food directly (consuming data). They can only take food if there’s something available on the counter.

  • Shared Buffer (Counter): The counter has limited space, so chefs can only add food if there’s room, and waiters can only take food if there’s something there. If the counter is full, chefs wait (producer blocked). If it’s empty, waiters wait (consumer blocked).

  • Synchronization: The kitchen staff and waiters coordinate to avoid conflicts—like ensuring chefs don’t overfill the counter or waiters don’t try to grab food that isn’t there. This is similar to using locks or semaphores in programming to manage access to the shared buffer.

What Problems Does It Solve?

The producer-consumer pattern addresses several critical challenges in concurrent programming:

  1. Workload Distribution: Efficiently distributes tasks across multiple worker threads

  2. Rate Mismatch Handling: Manages scenarios where producers and consumers work at different speeds

  3. Resource Utilization: Maximizes CPU utilization by allowing parallel processing

  4. Backpressure Mechanism: Provides natural throttling when production outpaces consumption

  5. Decoupling: Separates the concerns of data generation and processing

Synchronization Challenges

Implementing this pattern correctly requires addressing several concurrency issues:

  1. Race Conditions: Multiple threads accessing shared data simultaneously

  2. Buffer Overflow: Producers overwhelming a bounded buffer

  3. Buffer Underflow: Consumers attempting to process from an empty buffer

  4. Deadlocks: Threads waiting indefinitely due to circular dependencies

Implementing the Producer-Consumer Pattern

Let's implement a complete producer-consumer solution using a bounded buffer:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

// Shared resources
std::queue<int> buffer;         // Raw (unsafe) queue
std::mutex mtx;                 // Mutex to protect the queue
std::condition_variable producer_cv;     // Condition variable for signaling
std::condition_variable consumer_cv;     // we can use only one cv but two is safer
const int BUFFER_CAPACITY = 5;  // Maximum buffer size

// Producer function
void producer(int num_items) {
    for (int i = 0; i < num_items; ++i) {
        std::unique_lock<std::mutex> lock(mtx);

        // Wait if buffer is full (optional)
        producer_cv.wait(lock, [&] { return buffer.size() < BUFFER_CAPACITY; });

        buffer.push(i);
        std::cout << "Produced: " << i << std::endl;

        lock.unlock();
        producer_cv.notify_one();  // Wake up one consumer
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

// Consumer function
void consumer(int num_items) {
    for (int i = 0; i < num_items; ++i) {
        std::unique_lock<std::mutex> lock(mtx);

        // Wait if buffer is empty
        consumer_cv.wait(lock, [&] { return !buffer.empty(); });

        int item = buffer.front();
        buffer.pop();
        std::cout << "Consumed: " << item << std::endl;

        lock.unlock();
        consumer_cv.notify_one();  // Wake up one producer (if using buffer limits)
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

int main() {
    const int NUM_ITEMS = 10;
    std::thread prod(producer, NUM_ITEMS);
    std::thread cons(consumer, NUM_ITEMS);

    prod.join();
    cons.join();

    return 0;
}

Breaking Down the Implementation

1. BoundedBuffer Class

Our contains:

  • A queue for storing items

  • A capacity limit

  • A mutex for thread synchronization

  • Two condition variables:

    • producer_cv: Signaled when the buffer is not full (producers wait on this)

    • consumer_cv: Signaled when the buffer is not empty (consumers wait on this)

2. Key Methods

The class provides three key methods:

  • produce(): Adds an item to the buffer, waiting if buffer is full

  • consume(): Removes an item from the buffer, waiting if buffer is empty

3. Condition Variable Logic

The most crucial part is how condition variables are used:

  • Producers wait on producer_cv until there's space or shutdown is requested

  • Consumers wait on consumer_cv until there are items or shutdown is requested

  • After producing, we notify one consumer

  • After consuming, we notify one producer

Visualizing the Producer-Consumer Pattern

Let's visualize the flow with a diagram:

┌───────────────┐     ┌──────────────┐     ┌───────────────┐
│   Producer 1  │     │              │     │  Consumer 1   │
└───────┬───────┘     │              │     └───────┬───────┘
        │             │              │             │
        │ produce()   │   Bounded    │  consume()  │
        ▼             │    Buffer    │             ▼
┌───────────────┐     │              │     ┌───────────────┐
│   Producer 2  ├────►│  [■■■□□□□□]  ├────►│  Consumer 2   │
└───────┬───────┘     │              │     └───────┬───────┘
        │             │              │             │
        │             │              │             │
        ▼             │              │             ▼
┌───────────────┐     └──────────────┘     ┌───────────────┐
│   Producer 3  │                          │  Consumer 3   │
└───────────────┘                          └───────────────┘

          │                                       │
          │                                       │
          ▼                                       ▼
    wait on not_full                      wait on not_empty
    if buffer is full                      if buffer is empty

Using C++20 Features

C++20 introduces new synchronization primitives like std::counting_semaphore and std::latch that can simplify producer-consumer implementations:

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <semaphore>
#include <chrono>
#include <atomic>

constexpr int MAX_QUEUE_SIZE = 10;

class SemaphoreQueue {
public:
    void push(int value) {
        space_available.acquire(); // wait for space
        {
            std::scoped_lock lock(mutex_);
            queue_.push(value);
        }
        items_available.release(); // signal item added
    }

    bool pop(int& value, std::atomic<bool>& running) {
        if (!running.load()) return false;

        items_available.acquire(); // wait for item
        {
            std::scoped_lock lock(mutex_);
            if (!queue_.empty()) {
                value = queue_.front();
                queue_.pop();
            } else {
                return false;
            }
        }
        space_available.release(); // signal space available
        return true;
    }

private:
    std::queue<int> queue_;
    std::mutex mutex_;
    std::counting_semaphore<MAX_QUEUE_SIZE> items_available{0};
    std::counting_semaphore<MAX_QUEUE_SIZE> space_available{MAX_QUEUE_SIZE};
};

void producer(SemaphoreQueue& q, std::atomic<bool>& running) {
    int counter = 0;
    while (running.load()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        q.push(counter);
        std::cout << "[Producer] Produced: " << counter << '\n';
        ++counter;
    }
}

void consumer(SemaphoreQueue& q, std::atomic<bool>& running) {
    int value;
    while (running.load() || !q_empty(q)) {
        if (q.pop(value, running)) {
            std::cout << "[Consumer] Consumed: " << value << '\n';
            std::this_thread::sleep_for(std::chrono::milliseconds(300));
        }
    }
}

// Helper to check if queue is empty (under lock)
bool q_empty(SemaphoreQueue& q) {
    static std::mutex check_mutex;
    std::scoped_lock lock(check_mutex);
    return false; // not accurate for simplicity; could add a `size()` method
}

int main() {
    SemaphoreQueue queue;
    std::atomic<bool> running{true};

    std::jthread prod_thread([&] { producer(queue, running); });
    std::jthread cons_thread([&] { consumer(queue, running); });

    std::this_thread::sleep_for(std::chrono::seconds(5));
    running = false;

    // Threads join automatically
    return 0;
}

External Libraries for Producer-Consumer in C++

While we've focused on implementing the pattern from scratch, several libraries provide ready-to-use implementations:

  1. Boost.Thread: Provides boost::sync_bounded_queue and other synchronization primitives

  2. Intel Threading Building Blocks (TBB): Offers tbb::concurrent_queue and a rich task scheduler

  3. libcds: A C++ library of concurrent data structures with several producer-consumer queue implementations

  4. folly: Facebook's open-source library includes folly::ProducerConsumerQueue

  5. concurrentqueue: A fast multi-producer, multi-consumer lock-free concurrent queue by Cameron Desrochers

Real-World Applications

The producer-consumer pattern is used in:

  1. Job Schedulers: Processing tasks in distributed systems

  2. Media Processing: Audio/video encoding pipelines

  3. Web Servers: Handling client requests with thread pools

  4. Database Systems: Query processing and result buffers

  5. Message Queues: Implementing pub-sub systems

  6. Simulation Systems: Event processing in discrete event simulations

Performance Considerations

When implementing producer-consumer systems, consider:

  1. Buffer Size: Too small causes excessive context switching, too large wastes memory

  2. Contention: High producer or consumer counts can lead to lock contention

  3. Granularity: Processing items in batches can improve throughput

  4. Memory Allocation: Pre-allocating buffers can reduce overhead

  5. Cache Considerations: Design data structures to minimize false sharing

Conclusion

The producer-consumer pattern using mutex and condition variables provides a powerful mechanism for coordinating work between threads. Understanding the underlying synchronization primitives gives you the flexibility to tailor implementations to specific requirements and constraints.

By implementing the pattern directly using mutex and condition variables, you gain insights into thread synchronization that using high-level abstractions might obscure. This knowledge is invaluable when debugging concurrency issues or optimizing performance in multithreaded applications.

Next time you need to distribute work among threads, consider how the producer-consumer pattern might help you build more efficient and robust concurrent systems.

Additional Resources

10
Subscribe to my newsletter

Read articles from Abdullah Khairy directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Abdullah Khairy
Abdullah Khairy