From Failure to Flow: How I Used Polars to Conquer Memory Issues in Our Data Pipelines


Ever been bogged down by data pipelines crashing due to memory issues? It's a frustratingly common problem in data engineering projects. This post chronicles my experience of identifying and resolving memory bottlenecks in our data processing using the powerful Polars library adhering to data engineering best practices.
The Situation
The project involved generating Monte Carlo Projections using Geometric Brownian Motion (GBM) to create thousands of potential future price paths for assets. This helps analysts estimate the probabilities of various outcomes.
First, I would like to provide a concise explanation of the components involved in this pipeline.
Pipeline Components
Step 1: Generate a random normal distribution and then perform a cumulative sum over it.
Step 2: Using the Covariance matrix and two sets of random normal distributions, compute the Einstein summation.
Step 3: Incorporating both deterministic trends (drift) and random fluctuations (volatility) using GBM.
Step 4: Perform mean aggregation & select based on filter criteria.
Issue
The initial version used
Numpy
&Pandas
library for all computations.We have to run the Projection for at least 100 years, but many times it would be even more than that. Let’s break down how the computation looks based on the above steps,
import numpy as np rng = np.random.default_rng() DAYS = 365 YEARS = 1_00 PATH = 20_000 # Generating random cumulative sum normal distrubution # NOTE - We required two two normal distrubution set nor_dis_random1 = np.cumsum( rg.normal(0, np.sqrt(1 / days), (days * fixed_years, paths)), axis=0 ) nor_dis_random2 = np.cumsum( rg.normal(0, np.sqrt(1 / days), (days * fixed_years, paths)), axis=0 ) # Computing Einstein Summation ein_corr = np.einsum( "ij,jkl->ikl", covariance_matrix_data, np.array([nor_dis_random1, nor_dis_random2]), casting="same_kind", optimize=True, )
As you can see above, we had to deal with extremely big arrays.
Numpy
has to allocate memory for random normal distribution generation. Business logic requires two such big arrays to compute Einstein Summation. For 100 years & above even after beefing it till64 GB
it was failing withOut of Memory
Error.Following the Einstein Summation task, numerous transformations, including the application of GBM, were performed. This was initially done using
Pandas
, which made already memory hogging code even more bulky.import pandas as pd # Creating two pandas dataframe from Einstein Summation aaray projecttion_a = pd.DataFrame(ein_corr[0].T).cumsum(axis=0) projecttion_b = pd.DataFrame(ein_corr[1].T).cumsum(axis=0) # Further transformation sigma_s = 0.26 mu_s = 0.05 s0 = 391555 projecttion_a = S0 * np.exp((mu_S - 0.5 * sigma_S**2) + sigma_S * projecttion_a projecttion_b = S0 * np.exp((mu_S - 0.5 * sigma_S**2) + sigma_S * projecttion_b
It became simply unsustainable to keep increasing memory.
The Tasks
Let me show you how I divided the problem into smaller tasks and solved them, aiming for a cohesive and efficient process.
Assessment
The moment I stepped into this project, I realized that
Pandas
wasn't the right tool for the job. Both Numpy and Pandas store everything in memory from start to finish, and every transformation adds to memory use.While exploring possible solutions, I wondered about using
PySpark
because of its ability to handle distributed workloads. But then, I stumbled upon two significant issues:It doesn't have first-class support for running NumPy functions across a cluster.
The environment is costly and bulky since Spark is cluster-based.
Switching to Spark would have required a lot more work. That's where Polars comes in to save the day. Here are the key reasons why I chose it:
Like PySpark’s
lazy evaluation
, Polars also supports it withLazyFrame
.First-class for Numpy functions, even in LazyFrame.
Runs on a single machine and generally offers excellent performance since its based on Rust & internal use of parallel processing.
Redesigned Pipeline components
Step 1: Replace all
Pandas Dataframe
code withPolars LazyFrame
.Step 2: Since Polars fully supports Numpy functions, keep using specific Numpy functions that Polars doesn't have built-in.
Step 3: Switch to
Batch Processing
for transformations. Ensure each batch creates aLazyFrame
, so nothing is stored in memory until the final execution.Step 4: Keep adding transformation steps to the LazyFrame and execute them at the end. This approach lets Polars excel by making the most of its
Lazy API
.
With the lazy API, Polars doesn't run each query line-by-line but instead processes the full
query end-to-end. To get the most out of Polars it is important that you use the lazy API because:
- The lazy API allows Polars to apply automatic query optimization with the query optimizer.
- The lazy API allows you to work with larger than memory datasets using streaming.
- The lazy API can catch schema errors before processing the data.
The Actions
Enough theory talking, now lets dive into real coding based actions that solved the all the above issues.
Numpy function + Polars Lazyframe
import polars as pl
import numpy as np
rng = np.random.default_rng()
DAYS = 365
YEARS = 1_00
PATH = 20_000
def unit_batch(fixed_years=10):
return (
pl.LazyFrame()
.with_columns(
# Here I am directly using NumPy's normal() & cumsum() in Polars Lazyframe since
# it have first class support
nor_dis_random=np.cumsum(
rg.normal(0, np.sqrt(1 / days), (days * fixed_years, paths)).astype(np.float32),
axis=0,
dtype=np.float32,
) # I am using Float32 as data structure instead of default Float64
)
# I need to create lazyframe of shape (x,y) from normal distribution data of shape (x,y)
# Initially `nor_dis_random` is ArrayType column to which I am exploding to create y columns
# The beauty of Polars Lazy API is we can keep adding steps to Query plan.
.with_columns(pl.col("nor_dis_random").arr.to_struct().alias("array_struct"))
.unnest("array_struct")
.drop("nor_dis_random")
)
# Creating stack of all batches of lazyframe, that will eventually get concated
normal_dist_random_cum_sum_frames = [
_unit_chunk(i)
for i in alive_it(
chunks, title="Normal Distribution Cumulative sum", force_tty=True, total=len(chunks)
)
]
# This steps is very important as here I am vertically stacking all batches but still as LazyFrame
normal_dist = pl.concat(normal_dist_random_cum_sum_frames, how="vertical")
Batch Processing on Polars Lazyframe
BATCH_SIZE = 1_000
def calculate_no_of_batches(row_count: int, batch_size: int = 10) -> int:
return row_count // batch_size + (row_count % batch_size > 0)
ein_corr_a, ein_corr_b = [], []
no_of_batches = calculate_no_of_batches(
normal_dist1.select(pl.len()).collect().item(), batch_size=BATCH_SIZE
)
# Batch processing loop
for batch in alive_it(range(no_of_batches), title="Einstein summation", force_tty=True):
# Polars Lazy API provides as with slice() to iterate over lazyframe without actually
# loading data in memory. Its similar to SQL Order & limit without actually loading data
brownian_path_array1 = normal_dist1.slice(batch * BATCH_SIZE, BATCH_SIZE)
brownian_path_array2 = normal_dist2.slice(batch * BATCH_SIZE, BATCH_SIZE)
# Append as array to calculate Einstein summation convention
einstein_summation_operands = np.array(
[
brownian_path_array1.collect().to_numpy().T,
brownian_path_array2.collect().to_numpy().T,
],
dtype=np.float32,
)
ein_corr = np.einsum(
"ij,jkl->ikl",
covariance_matrix_data,
einstein_summation_operands,
dtype=np.float32,
casting="same_kind",
optimize=True,
)
# Creating lazyframe from above & perform cumulative sum over entire column
# Once again Polars Expressive Lazy API makes code more cleaner & readable.
ein_corr_a.append(pl.LazyFrame(ein_corr[0].T).with_columns(pl.all().cum_sum()))
ein_corr_b.append(pl.LazyFrame(ein_corr[1].T).with_columns(pl.all().cum_sum()))
# Just like above, even here at end all smaller batches of lazyFrame is
# concated as Unified Lazyframe. Still nothing is stored in memory.
ein_corr_frame1 = pl.concat(ein_corr_a, how="vertical")
ein_corr_frame2 = pl.concat(ein_corr_b, how="vertical")
Building data transformation Query graph
- Computing GBM paths
# Get column names from standard_brownian for explicit transformations
col_names = ein_corr_frame1.collect_schema().names()
ein_corr_frame1 = (
ein_corr_frame1
# Step 1: Calculate drift term + volatility term for each column
.with_columns(
[
(
sigma_s * pl.col(col_name) + (mu_s - 0.5 * sigma_s**2) * pl.col("real_number_time")
).alias(f"{col_name}_gbm")
for col_name in col_names
]
)
# Step 2: Apply exponential function and scale by s0
.with_columns(
[
(pl.col(f"{col_name}_gbm").exp() * s0).alias(f"Path_{i + 1}")
for i, col_name in enumerate(col_names)
]
)
# Step 3: Keep only the final sales path columns
.select([f"Path_{i + 1}" for i in range(len(col_names))])
)
- Compute Mean from GBM Paths & Use of Dynamic Scaling to fix Infinite number issue
# Get column names from standard_brownian for explicit transformations
col_names = ein_corr_frame1.collect_schema().names()
ein_corr_frame1 = (
ein_corr_frame1.with_row_index("_idx", 1)
# select row no present in year_end_location_time
.filter(
pl.col("_idx").is_in(
time_step.select("year_end_location_time").drop_nulls().collect().to_series()
)
)
.drop("_idx", strict=False)
# Handle infinities by replacing them with None
.with_columns(
[
pl.when(pl.col(c).is_infinite()).then(None).otherwise(pl.col(c)).alias(c)
for c in col_names
]
) # Calculate max value per row for Dynamic Scaling ops
.with_columns(max_value=pl.max_horizontal(pl.all()))
# Performing dynamic scale mean by scaling down --> mean --> scale up
.with_columns(
Sales=pl.mean_horizontal( # taking mean of entire row
# dividing all cols by max value (scaled down)
pl.all().exclude("max_value") / pl.col("max_value")
) # multiplying mean scaled down with max value (scale up)
* pl.col("max_value")
) # We only need 'Sales' column
.select("Sales")
)
- Materializing the entire query plan into final output
# Till now we have performed many heavy maths based calculations, but everything is defred
# running collect() will materialize all the queries into final dataframe
ein_corr_frame1.collect()
Key Points:
Polars expressive Lazy API is very powerful, clean & highly readable.
From above you can easily gather that I am simple keep adding steps to Query plan without actually running them yet.
This gives Polars’ engine many opportunities to optimize the query.
Untill we don’t run
LazyFrame.collect()
all the queries are deferred & nothing is stored in memory
The Results
The entire Monte Carlo Simulation running for 100 years were able to complete under 20-25 GB of memory, which was simply failing in using Pandas even after providing 64 GB.
Added benefit was total time because Polars is based on Rust & internally uses parrallel processing to run queries.
The Expressive API of Polars library is very powerful & intuitive, especially for Data engineers coming from SQL world.
This experience underscores the importance of choosing the right tools and approaches in data engineering to achieve optimal performance and efficiency.
Subscribe to my newsletter
Read articles from Akash Desarda directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
