Understanding Ruby 3.3 Concurrency: A Comprehensive Guide

BestWeb VenturesBestWeb Ventures
13 min read

Ruby 3.3, released with significant improvements in concurrency capabilities, marks a pivotal shift in how Ruby applications handle parallel processing and concurrent operations.

This advancement is particularly crucial as modern applications increasingly demand efficient handling of multiple tasks simultaneously, especially in domains like AI and ML.

This article explores Ruby 3's concurrency ecosystem, its various concurrency models, and their practical applications in modern software development.

Ruby Concurrency and its Ecosystem

Ruby's concurrency ecosystem has evolved significantly, offering developers multiple tools and abstractions to handle concurrent operations effectively.

The ecosystem now includes improved implementations of Threads, Fibers, and the revolutionary Ractor system, along with various libraries and frameworks that leverage these features.

Key Components of Ruby's Concurrency Ecosystem:

  • Thread API for traditional multi-threading

  • Fiber API for lightweight concurrency

  • Ractor for parallel execution

  • AsyncIO libraries (async, async-io)

  • Concurrent Ruby gem

  • Ruby Queue implementation

Code Examples Demonstrating Ruby’s Concurrency Capabilities

Let's look at two practical examples that demonstrate Ruby 3's concurrency capabilities:

Example 1: Concurrent HTTP Requests using Thread Pool

# Example 1: Concurrent HTTP Requests using Thread Pool
require 'net/http'
require 'concurrent-ruby'

class WebCrawler
  def fetch_urls(urls, max_threads: 5)
    pool = Concurrent::FixedThreadPool.new(max_threads)
    promises = urls.map do |url|
      Concurrent::Promise.execute(executor: pool) do
        fetch_url(url)
      end
    end

    results = promises.map(&:value!)
    pool.shutdown
    results
  end

  private

  def fetch_url(url)
    uri = URI(url)
    response = Net::HTTP.get_response(uri)
    { url: url, status: response.code, body: response.body }
  rescue => e
    { url: url, error: e.message }
  end
end

# Usage
crawler = WebCrawler.new
urls = ['https://api1.example.com', 'https://api2.example.com']
results = crawler.fetch_urls(urls)

Explanation: This example demonstrates a thread pool-based web crawler that efficiently fetches multiple URLs concurrently. It uses the concurrent-ruby gem's FixedThreadPool to manage a limited number of threads, preventing resource exhaustion. The Promise class handles asynchronous operations and error handling, making it ideal for I/O-bound tasks like HTTP requests.

Example 2: Concurrent Data Processing with Queues

# Example 2: Concurrent Data Processing with Queues
require 'thread'

class DataProcessor
  def initialize(worker_count: 4)
    @queue = Queue.new
    @results = Queue.new
    @workers = worker_count
  end

  def process_data(items)
    start_workers
    enqueue_items(items)
    collect_results(items.size)
  end

  private

  def start_workers
    @worker_threads = @workers.times.map do
      Thread.new do
        while item = @queue.pop
          result = process_item(item)
          @results.push(result)
        end
      end
    end
  end

  def enqueue_items(items)
    items.each { |item| @queue.push(item) }
    @workers.times { @queue.push(nil) } # Signal workers to stop
  end

  def collect_results(expected_count)
    results = []
    expected_count.times { results << @results.pop }
    @worker_threads.each(&:join)
    results
  end

  def process_item(item)
    # Simulate processing
    sleep(rand * 0.1)
    { item: item, processed: true }
  end
end

Explanation: This implementation showcases a thread-safe producer-consumer pattern using Ruby's Queue class. It creates a pool of worker threads that process items from an input queue and push results to an output queue. The pattern is particularly useful for processing large datasets where tasks can be broken down into smaller, independent units of work.

Ruby’s Concurrency vs Parallelism

While concurrency and parallelism are often used interchangeably, they represent different concepts in Ruby 3:

  • Concurrency: Managing multiple tasks and making progress on them over time

  • Parallelism: Executing multiple tasks simultaneously on different processors

Ruby 3 introduces better support for true parallelism through Ractors, while maintaining its existing concurrency mechanisms. Here are two examples demonstrating both concepts:

Example 1: Parallel Processing with Ractors

class ParallelProcessor
  def self.parallel_map(array)
    slice_size = (array.size / 4.0).ceil
    ractors = array.each_slice(slice_size).map do |slice|
      Ractor.new(slice) do |data|
        data.map do |n|
          # Directly use the logic here to avoid Proc isolation issues
          Math.sqrt(n ** 3).round(5)
        end
      end
    end

    ractors.map(&:take).flatten
  end
end

# Usage
numbers = (1..1000).to_a
result = ParallelProcessor.parallel_map(numbers)

puts result

Explanation: This example demonstrates true parallelism using Ruby’s experimental Ractor feature. It splits an array into chunks, with each chunk processed independently in parallel by separate Ractors. Each Ractor performs a CPU-intensive calculation, allowing for effective utilization of multiple CPU cores. By embedding the computation logic directly in each Ractor block, this implementation avoids issues related to Proc isolation and unshareable objects, which can arise when working with Ractors. This approach is especially beneficial for mathematical computations or other CPU-bound tasks that can be parallelized efficiently.

Example 2: Concurrent vs Parallel File Processing

# Example 2: Concurrent vs Parallel File Processing
require 'fileutils'

class FileProcessor
  def process_files_concurrent(files)
    threads = files.map do |file|
      Thread.new do
        process_file(file)
      end
    end
    threads.map(&:value)
  end

  def process_files_parallel(files)
    ractors = files.map do |file|
      Ractor.new(file) do |f|
        process_file(f)
      end
    end
    ractors.map(&:take)
  end

  private

  def process_file(file)
    # Simulate file processing
    content = File.read(file)
    processed = content.upcase
    FileUtils.mkdir_p('processed')
    File.write("processed/#{File.basename(file)}", processed)
    { file: file, status: 'processed' }
  end
end

Explanation: This example contrasts concurrent and parallel approaches to file processing. The concurrent version uses threads, suitable for I/O-bound operations like file reading/writing, while the parallel version uses Ractors for true parallelism. It demonstrates how the same task can be implemented differently based on whether the bottleneck is I/O (threads) or CPU (Ractors).

Ruby - Threads vs Fibers vs Ractors

Understanding the differences between Threads, Fibers, and Ractors is crucial for choosing the right concurrency primitive for your needs.

Let's explore how these three mechanisms differ in their approach to concurrent programming and their ideal use cases.

Understanding the Core Differences

Ruby's concurrency story has evolved significantly with these three distinct mechanisms.

Threads, the traditional approach, operate within the same process and share memory space. They provide a familiar concurrency model but are limited by the Global Interpreter Lock (GIL).

Each thread maintains its own execution context and stack, making them relatively heavyweight compared to other options.

Fibers, introduced as a lightweight alternative, representing a fundamentally different approach to concurrency. They operate on a cooperative scheduling model, where each fiber must explicitly yield control to others.

This makes them exceptionally efficient for managing many concurrent operations, particularly in scenarios involving I/O operations. Unlike threads, fibers share the same execution context and require minimal memory overhead.

Ractors, Ruby's newest concurrency primitive, take a revolutionary approach by providing true parallel execution capabilities. They operate with isolated memory spaces and communicate through message passing, effectively bypassing the GIL's limitations.

This isolation prevents the common pitfalls of shared-state concurrency while enabling genuine parallel execution of Ruby code.

Performance and Resource Utilization

When it comes to resource utilization, each mechanism has distinct characteristics.

Threads typically consume around 8MB of memory per instance and involve operating system overhead for context switching.

While this makes them more resource-intensive, their preemptive scheduling makes them ideal for long-running tasks that need to share processor time fairly.

Fibers, in contrast, use only a few kilobytes of memory per instance and handle their own scheduling.

This efficiency makes them perfect for applications that need to manage thousands of concurrent operations, such as web servers handling multiple simultaneous connections.

However, their cooperative nature means that poorly written fiber code can block other fibers from executing.

Ractors introduce additional overhead compared to threads but provide true parallelism in return.

Each Ractor runs in its own thread and maintains its own isolated heap, making them more memory-intensive than both threads and fibers.

However, this isolation enables them to fully utilize multiple CPU cores, making them invaluable for CPU-bound workloads.

Practical Implementation Considerations

When implementing concurrent systems, each mechanism requires different design approaches.

Thread-based systems need careful consideration of synchronization mechanisms like mutexes and locks to prevent race conditions.

This can make thread-based code more complex to write and debug, but threads remain valuable for their ability to handle blocking operations without stopping the entire program.

Fiber-based systems require explicit yield points and careful attention to the execution flow.

While this might seem restrictive, it actually makes fiber-based code more predictable and easier to reason about.

The async/await pattern, commonly implemented using fibers, provides a clean and intuitive way to handle concurrent operations.

Ractor-based systems demand a message-passing approach to communication, similar to actor-based concurrency models.

While this requires rethinking how components interact, it eliminates many traditional concurrency bugs by preventing shared state access.

This makes Ractors particularly suitable for parallel processing tasks where data can be cleanly partitioned.

Threads - Code Example

Threads in Ruby operate within the same process and share memory space. They're limited by the Global Interpreter Lock (GIL) but are excellent for I/O-bound operations.

class ThreadBasedProcessor
  def process_batch(items)
    threads = []
    results = Queue.new

    items.each do |item|
      threads << Thread.new do
        begin
          result = complex_calculation(item)
          results.push({ status: :success, item: item, result: result })
        rescue => e
          results.push({ status: :error, item: item, error: e.message })
        end
      end
    end

    threads.each(&:join)
    collect_results(results, items.size)
  end

  private

  def complex_calculation(item)
    sleep(0.1) # Simulate I/O operation
    item * 2
  end

  def collect_results(results, expected_count)
    Array.new(expected_count) { results.pop }
  end
end

Explanation: This example shows how to use threads for parallel processing with error handling. It creates a thread for each item, performs a calculation, and collects results in a thread-safe queue. The implementation includes proper thread joining and error handling, making it robust for production use.

Fibers - Code Example

Fibers are lightweight concurrency primitives that enable cooperative scheduling and are excellent for handling many concurrent operations without the overhead of threads.

require 'fiber'
require 'async'

class FiberBasedProcessor
  def process_async_batch(items)
    Async do |task|
      results = []
      mutex = Async::Mutex.new

      fibers = items.map do |item|
        task.async do
          result = process_item(item)
          mutex.synchronize { results << result }
        end
      end

      fibers.each(&:wait)
      results
    end
  end

  private

  def process_item(item)
    Async do
      # Simulate async I/O operation
      sleep(0.1)
      { item: item, processed_at: Time.now }
    end
  end
end

Explanation: This code demonstrates using Fibers with the async gem for concurrent processing. It creates lightweight fibers for each item, processes them asynchronously, and collects results using a mutex for thread-safety. Fibers are particularly efficient for I/O-bound operations as they consume less memory than threads.

Ractors - Code Example

Ractors enable true parallel execution with isolated memory spaces, making them ideal for CPU-bound operations.

Unlike traditional threads in Ruby, which are limited by the Global Interpreter Lock (GIL), Ractors allow parallel execution without memory sharing issues.

This isolation of memory spaces enables efficient parallel processing, especially for tasks that require intensive CPU usage.

class RactorBasedProcessor
  def process_parallel_batch(items, worker_count = 4)
    # Divide items into chunks for each Ractor
    chunks = items.each_slice((items.size.to_f / worker_count).ceil).to_a

    # Create Ractors, defining `complex_math` inside each one
    ractors = chunks.map do |chunk|
      Ractor.new(chunk) do |data|
        # Define the complex_math method inside the Ractor
        def complex_math(n)
          (1..1000).reduce(n) { |sum, i| sum + Math.sqrt(i ** 2) }
        end

        # Process each item in the chunk
        data.map do |item|
          result = complex_math(item)
          [item, result]
        end
      end
    end

    # Collect results from each Ractor
    results = {}
    ractors.each do |ractor|
      ractor.take.each do |item, result|
        results[item] = result
      end
    end
    results
  end
end

# Usage example
processor = RactorBasedProcessor.new
result = processor.process_parallel_batch([1, 2, 3, 4, 5, 6, 7, 8, 9, 0])
puts result

Explanation: This implementation demonstrates Ractors performing CPU-intensive calculations in parallel by dividing an array into chunks and processing each chunk in separate Ractors. Each Ractor runs independently, performing complex calculations on its assigned data, and returns the results. Unlike threads, Ractors can achieve true parallelism by bypassing the Global Interpreter Lock (GIL), making them ideal for CPU-bound tasks. By defining the computational method directly within each Ractor, this implementation also avoids scope isolation issues, ensuring that each Ractor remains isolated and self-contained.

Why Concurrency is Required in AI and ML?

Concurrency plays a crucial role in AI and ML for several compelling reasons:

  1. Data Processing Efficiency

    • Large datasets require parallel processing

    • Multiple data streams need concurrent handling

    • Real-time data processing demands

  2. Model Training Optimization

    • Parallel training of multiple models

    • Concurrent hyperparameter tuning

    • Distributed learning processes

  3. Resource Utilization

    • Efficient use of available CPU cores

    • Better memory management

    • Improved throughput for computational tasks

  4. Response Time Requirements

    • Real-time prediction serving

    • Concurrent user request handling

    • Batch processing optimization

  5. Scalability Needs

    • Horizontal scaling capabilities

    • Load distribution

    • Resource allocation flexibility

Example 1: Concurrent OpenAI API Processing

require 'openai'
require 'concurrent-ruby'

class ConcurrentAIProcessor
  def initialize(api_key)
    @client = OpenAI::Client.new(access_token: api_key)
    @pool = Concurrent::FixedThreadPool.new(5)  # Limit concurrent API calls
  end

  def process_batch_queries(queries)
    promises = queries.map do |query|
      Concurrent::Promise.execute(executor: @pool) do
        begin
          response = @client.chat(
            parameters: {
              model: "gpt-3.5-turbo",
              messages: [{ role: "user", content: query }],
              temperature: 0.7,
              max_tokens: 150
            }
          )

          { 
            status: :success,
            query: query,
            response: response.dig('choices', 0, 'message', 'content'),
            tokens: response.dig('usage', 'total_tokens')
          }
        rescue => e
          {
            status: :error,
            query: query,
            error: e.message
          }
        end
      end
    end

    # Wait for all promises and collect results
    results = promises.map(&:value!)
    @pool.shutdown

    # Analyze results
    {
      successful: results.count { |r| r[:status] == :success },
      failed: results.count { |r| r[:status] == :error },
      total_tokens: results.sum { |r| r[:tokens].to_i },
      responses: results
    }
  end
end

# Usage example
processor = ConcurrentAIProcessor.new('your-api-key')
queries = [
  "Explain quantum computing in simple terms",
  "What is machine learning?",
  "How does natural language processing work?"
]
results = processor.process_batch_queries(queries)

Explanation: This example showcases concurrent processing of OpenAI API requests using a thread pool. It manages rate limiting through pool size, handles errors gracefully, and provides detailed analytics for each batch of queries. The implementation uses Concurrent::Promise for non-blocking execution and proper resource management.

Example 2: Parallel ML Model Training with Ractors

class MLModelTrainer
  def train_models_in_parallel(training_data, model_configs)
    # Split data for cross-validation
    data_folds = create_cross_validation_folds(training_data, folds: 5)

    # Create Ractors for parallel model training
    ractors = model_configs.map do |config|
      Ractor.new(data_folds, config) do |folds, model_params|
        results = folds.map do |fold|
          {
            params: model_params,
            metrics: train_and_evaluate(fold[:train], fold[:test], model_params)
          }
        end

        # Average metrics across folds
        avg_metrics = calculate_average_metrics(results)
        [model_params, avg_metrics]
      end
    end

    # Collect results from all Ractors
    results = ractors.map(&:take).to_h
    select_best_model(results)
  end

  private

  def train_and_evaluate(train_data, test_data, params)
    # Simulate ML model training
    model = initialize_model(params)
    history = train_model(model, train_data, params)

    # Evaluate on test data
    {
      accuracy: evaluate_accuracy(model, test_data),
      f1_score: calculate_f1_score(model, test_data),
      training_time: history[:training_time],
      convergence_epoch: history[:convergence_epoch]
    }
  end

  def initialize_model(params)
    # Simulate model initialization with hyperparameters
    {
      learning_rate: params[:learning_rate],
      layers: params[:layers],
      activation: params[:activation]
    }
  end

  def train_model(model, data, params)
    epochs = params[:epochs] || 100
    start_time = Time.now

    # Simulate training loop
    convergence_epoch = (epochs * 0.7).to_i # Simulate early convergence
    epochs.times do |epoch|
      # Simulate epoch training
      sleep(0.01) # Simulate computation time
      break if epoch >= convergence_epoch
    end

    {
      training_time: Time.now - start_time,
      convergence_epoch: convergence_epoch
    }
  end

  def create_cross_validation_folds(data, folds:)
    # Simulate splitting data into folds
    folds.times.map do |i|
      {
        train: data.select.with_index { |_, idx| idx % folds != i },
        test: data.select.with_index { |_, idx| idx % folds == i }
      }
    end
  end

  def calculate_average_metrics(fold_results)
    metrics = fold_results.map { |r| r[:metrics] }
    {
      avg_accuracy: metrics.sum { |m| m[:accuracy] } / metrics.size,
      avg_f1_score: metrics.sum { |m| m[:f1_score] } / metrics.size,
      avg_training_time: metrics.sum { |m| m[:training_time] } / metrics.size,
      avg_convergence_epoch: metrics.sum { |m| m[:convergence_epoch] } / metrics.size
    }
  end

  def select_best_model(results)
    # Select best model based on accuracy and training time
    best_config = results.max_by { |_, metrics| metrics[:avg_accuracy] }
    {
      best_config: best_config[0],
      metrics: best_config[1]
    }
  end
end

# Usage example
trainer = MLModelTrainer.new
training_data = (1..1000).map { |i| [i, i * 2] } # Simulate dataset

model_configs = [
  { learning_rate: 0.01, layers: [64, 32], activation: 'relu', epochs: 100 },
  { learning_rate: 0.001, layers: [128, 64], activation: 'tanh', epochs: 100 },
  { learning_rate: 0.005, layers: [32, 16], activation: 'relu', epochs: 100 }
]

best_model = trainer.train_models_in_parallel(training_data, model_configs)

Explanation: This examples uses Ractors for true parallel processing of ML model training. Each Ractor handles a complete cross-validation cycle for a specific model configuration, enabling parallel exploration of different hyperparameter combinations. The code includes k-fold cross-validation, metrics calculation, and best model selection. Using Ractors instead of Threads allows for true parallel execution of CPU-intensive training tasks, making it more efficient for ML workloads.

Conclusion

Ruby's concurrency features represent a significant evolution in the language's capabilities for handling parallel and concurrent operations.

The introduction of Ractors, alongside the mature Thread and Fiber implementations, provides developers with a robust toolkit for building efficient, scalable applications.

Key takeaways from this exploration:

  1. Choose the right concurrency primitive based on your specific use case:

    • Threads for I/O-bound operations

    • Fibers for lightweight concurrency

    • Ractors for CPU-bound parallel processing

  2. Consider the trade-offs between complexity and performance when implementing concurrent solutions

  3. Leverage the rich ecosystem of concurrent programming tools and libraries available in Ruby

  4. Pay special attention to concurrency when building AI and ML applications, as it can significantly impact performance and scalability

As Ruby continues to evolve, its concurrency capabilities will likely expand further, making it an increasingly powerful choice for building modern, concurrent applications, particularly in the domains of AI and ML.

2
Subscribe to my newsletter

Read articles from BestWeb Ventures directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

BestWeb Ventures
BestWeb Ventures

From cutting-edge web and app development, using Ruby on Rails, since 2005 to data-driven digital marketing strategies, we build, operate, and scale your MVP (Minimum Viable Product) for sustainable growth in today's competitive landscape.