The importance of not using nulls for merge keys


Last week I was working on a Spark pipeline that was running slowly, and I discovered that a specific task with significant skew was the cause.
Googling for the problem didn’t return any meaningful result, so I had to figure it out myself.
Here’s what I found and how I fixed it.
Understanding SCD2 with Delta Lake
The pipeline was a SCD2 incremental build, something that every data engineer has to deal with sooner or later.
Frameworks like Delta allow the use of this strategy inside a data lake, thanks to a simple API that grants update and merge capabilities over otherwise immutable data.
Merging a sequence of data changes to a base table is relatively straightforward:
Identify the type of change (
INSERT
,UPDATE
,DELETE
) for each record in the sequencePrepare it to be merged
Run the merge operation
The second step is crucial because UPDATE
s are treated specially. An UPDATE
record must be used twice during the merge:
To mark the old record in the base table as expired (setting
is_current = false
and an end date).To insert the updated values as a new record.
Delta’s merge
operation begins with setting a merge condition, usually based on a primary key (e.g. base.primary_key = updates.primary_key
). Then, it has two main actions:
whenMatchedUpdate
: for records that meet the merge conditionwhenNotMatchedInsert
: for records that don’t meet the merge condition
We need to make sure that records of type UPDATE
fall under both of them. In other words, the record must both match and not match the merge condition at once.
To obtain this result, for these records we create duplicates that can never satisfy the whenMatched
condition. Usually, this is done by setting merge_key = null
, and then we append these records to the rest of the data we’re merging. This is how it’s recommended in Delta’s official documentation.
Null merge keys create skewed partitions
By setting merge_key = null
, we’re artificially creating skew: since a Delta merge is effectively a JOIN
, Spark will shuffle data based on the join/merge condition (unless broadcasted), and will send all records with merge_key = null
to a single partition, using only a single core to perform an operation that could otherwise be parallelized.
The above screenshot is taken from the build I was working on: the long green bar represents the task where all the records with the null
merge key ended up being, with the length representing its duration.
It took way longer to execute, and it processed an amount of records an order of magnitude larger than any other task in that job.
Setting the merge key to a valid value
Fortunately, the fix is simple: we need to set the merge_key
for those records to something that isn’t null
, can never match any of the primary keys, and is evenly distributed. If the primary key is a string, we can set the merge_key
like this:
df_inserts = df.withColumn(
"merge_key", F.concat(F.lit("__INSERT__"), F.col("id"))
)
This keeps the original distribution of data based on the primary key, while also ensuring that the merge_key
can never match the original pk.
However, if the primary key is a number, we can’t use the method above. That’s because during merge, the engine casts the merge_key
to an integer
, which means it will go back to being null
, since that’s the result of casting an alphanumeric string to a number.
In this case, we need to set our merge_key
to be a number, always making sure to never match the original pk. This can be done by setting it in the negatives (assuming the pk is always a positive integer). Here’s the example code:
df_inserts = df.withColumn(
"merge_key", -100_000_000 - F.col("id")
)
Like the above case, this keeps the same distribution of data, and also satisfies all the other requirements.
Applying the fix to the pipeline removed the skew entirely, as the data could now be properly distributed across all executors. The total run time was cut in half.
In the end, this is not different than salting join keys in a normal Spark pipeline. The source of confusion comes from using Delta API, which abstracts Spark joins and makes it a little less obvious to identify where the problem hides.
Subscribe to my newsletter
Read articles from Zambo directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
