Data Manipulation with PySpark

Spark has become the de facto tool for processing large amounts of data. It is a distributed, in-memory engine with interfaces for numerous data stores which makes it scalable, fast and flexible. Platforms like Databricks and Snowflake (SnowPark) use Spark underneath but it is possible to still use the open source Apache Spark distribution.

In this article, we will demonstrate the capability of Spark for data wrangling by exploring the TMDB database from Kaggle. Data manipulation language (DML) is a subset of the SQL language that makes data wrangling easy, with PySpark Dataframes, these can easily be translated.

Setup

Getting started with PySpark locally is fairly easy. Download the latest version from the Apache Spark site and set the SPARK_HOME environment variable for easier access:

export SPARK_HOME=/path/to/unzipped/spark/folder
export PATH=$SPARK_HOME/bin:$PATH

This assumes that Java has already been installed and the JAVA_HOME environment variable has been set as well. Also, it is usually helpful to update the PYTHON_PATH environment variable so that the installed python interpreter is able to find the pyspark module and import it into a script.

export PYTHON_PATH=$SPARK_HOME/python:$PYTHONPATH

While spark-submit can be used to run pyspark apps, in this case, we will be using Jupyter Notebooks (VSCode and other IDEs now support Notebooks as well) to run the commands. This can be installed using any of the available package managers like pip or poetry. uv is our tool of choice.

uv init pyspark-project
uv add jupyterlab
uv add findspark

Once all are installed, then we can fire up jupyter-lab and start our data wrangling. The first thing we need to do is get our Spark Session up and running. It is the entry-point to all the most of the data manipulation features of Spark. You can add configuration details while creating a Spark Session object or add them later. Below, we configure static executor memory to be 2gb and shuffle partitions w ereduce to 5 from the default of 200 since our dataset

 spark = (
    SparkSession.builder.
        config("spark.sql.shuffle.partitions", 5)
        .config("spark.executor.memory", "2g")
        .appName('pyspark_quickstart')
        .getOrCreate()
)

I/O

It is easy to read files, tables, DBs with Spark. In our example, we can a read in the TMDB dataset - a CSV file into Spark dataframe. Although, Spark can infer the schema of the dataset, it is recommended to explicit create the schema and use it in the input statement.

# schema creation
schema = T.StructType([
    T.StructField('id', T.StringType(), False),
    T.StructField('title', T.StringType()),
    T.StructField('vote_average', T.FloatType()),
    T.StructField('vote_count', T.IntegerType()),
    T.StructField('status', T.StringType()),
    T.StructField('release_date', T.DateType()),
    T.StructField('revenue', T.IntegerType()),
    T.StructField('runtime', T.IntegerType()),
    T.StructField('adult', T.BooleanType()),
    T.StructField('backdrop_path', T.StringType()),
    T.StructField('budget', T.IntegerType()),
    T.StructField('homepage', T.StringType()),
    T.StructField('imdb_id', T.StringType()),
    T.StructField('original_language', T.StringType()),
    T.StructField('original_title', T.StringType()),
    T.StructField('overview', T.StringType()),
    T.StructField('popularity', T.StringType()),
    T.StructField('poster_path', T.StringType()),
    T.StructField('tagline', T.StringType()),
    T.StructField('genres', T.StringType()),
    T.StructField('production_companies', T.StringType()),
    T.StructField('production_countries', T.StringType()),
    T.StructField('spoken_languages', T.StringType()),
    T.StructField('keywords', T.StringType())
]
)


# reading the csv
df_tmdb = spark.read.option('header', 'true')\
            .csv('~/datasets/TMDB_movie_dataset.csv', schema=schema)

Projection, Filtering and Expression

These are some of the most basic data manipulation operations. Projection and Expression is about getting the right fields - static and derived - to return (SELECT statement). Filtering deals with returning a subset of records that meet defined criteria.

# Projection using the Select method
# Get the columns: Title, Status, Budget, Revenue, Vote_Average and Release_Date from the data
df_tmdb.select(F.col('title')\
            .alias('movie_title'), 'status', 'budget', 'revenue', 
                   'vote_average', 'release_date').show(5, truncate=False)



# Expression using the SelectExpr method. Note the SQL-like expressions
# Get the title, release year, revenue and whether the movie grossed over 1M in revenue.
df_tmdb.selectExpr("title", "year(release_date)", "revenue", 
                    "IF(revenue >= 1000000, true, false) as higher_revenue")\
           .orderBy('revenue', ascending=False).show()



# Filter
# Single condition example: Get the movie with tile "Lethal Attraction"
df_tmdb.where(F.col('title')=='Lethal Attractions')\
        .select('title', 'status', 'budget', 'vote_average', 'release_date').show()


# Multiple conditions: Get all movies with runtime less than 1 hr but greater than 1 second
# and with budget over 200M
df_tmdb.filter((F.col('runtime') <= 60) 
                & (F.col('runtime') > 0) 
                & (F.col('budget') > 200000000))\
                .select('title', 'budget', 'runtime', 'genres').show()

Group By and Aggregation

In analytics, aggregations across dimensions are the metrics that provide insight into set goals for the business. Spark makes this easy by providing a groupby method on the dataframe object and all the usual aggregation functions.

# Get the max budget
df_tmdb.agg({"budget":"max"}).show()

# Get the summary stats for the movie budgets
df_tmdb.select(F.min('budget').alias('min_budget'), 
               F.median('budget'), F.mean('budget'), 
               F.percentile('budget', [0.25, 0.5, 0.75, 0.9, 0.95, 0.99]).alias('percentiles'),
               F.max(df_tmdb.budget)).show(truncate=False
                )

# Get the average and standard deviation of budgets by release year
df_tmdb.groupBy(F.year(df.release_date).alias('release_year'))\
    .agg(F.stddev('budget').alias('budget_std'), F.avg('budget').alias('budget_avg'))\
    .select('release_year', F.round('budget_avg', 2), F.round('budget_std', 2))\
    .orderBy('release_year', ascending=False)\
    .show()

Joins

Joins make it possible to answer complex questions from your data by combining multiple datasets. All the major kinds of Joins are available on Spark with a few extra:

  • inner join - returns records whose keys match in both table 1 and 2

  • left outer join - returns all records on the from the left table and only those from the right table with key matches from the right

  • right outer join - same as above but reversed

  • full outer join - returns all records from both tables whether they match on key or not

  • cross join - cartesian product of records of table and table 2

  • left semi join - return records from left table where there is a match in right table

  • left_anti = return rows from left table where there is not match in right table

# Create a dataframe from the current one
median_year = df_tmdb.agg(F.median(F.year(F.col('release_date'))).alias('median_year'))


# Full outer join example: Get the movies with release year greater than the average release year
df_tmdb.join(median_year, how='full')\
    .select('title', 'release_date', 'median_year')\
    .where(F.year(F.col('release_date'))\
    .cast(T.DoubleType()) >= F.col('median_year'))


# Left Semi: Movies with release year same as the median release year
df.join(median_year, how='left_semi', 
             on=(F.year(df.release_date).cast(T.DoubleType()) == median_year.median_year))\
            .select('title', 'release_date')


# Anti: Movies with release year NOT the same as the median release year
df.join(median_year, how='anti', 
             on=(F.year(df.release_date).cast(T.DoubleType()) == median_year.median_year))\
            .select('title', 'release_date')

Window Functions

These group of functions perform operations on defined windows across rows of data. Unlike aggregation functions, they also do not alter the number of records since each row gets a result based on the window around it.

To use a window function in Spark, you need to first define the window spec and then create a new column using the window function on the spec.

# Define the window spec
# here, we partition by release year (like a group by release year) and order by the vote_average
window = Window.partitionBy(F.year(F.col('release_date'))).orderBy(F.col('vote_average').desc())


# Use the window function on the window spec
# We use a dense_rank function to rank each record based on its vote_average within the 
# release year
df_tmdb.filter((F.isnotnull(df.release_date)) & (df.vote_count > 0))\
    .withColumn('vote_rank', F.dense_rank().over(window))\
    .select('title', 'release_date', 'budget', 'vote_average', 'genres', 'vote_count', 'vote_rank')\
    .filter(F.col('vote_rank') == 1)\
    .orderBy(F.col('release_date').desc_nulls_last())\
    .show(10)

Another example is to look at the cumulative sum of votes for each movie per release year. Here, we use the Sum function alongside a window spec to achieve the desired effect

# Window spec: partiton by release year and order by month
# the rolling window for each record is all prior rows in the partition until the current row
cum_sum_window = Window.partitionBy(F.year(F.col('release_date')))\
    .orderBy(F.month(F.col('release_date')))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# We use the sum function to find the cumulative vote_count for each row
df_tmdb.filter((F.isnotnull(df.release_date)) & (df.vote_count > 0))\
    .withColumn('cum_sum_votes', F.sum('vote_count').over(cum_sum_window))\
    .select('title', 'release_date', 'budget', 'vote_average', 'genres', 'vote_count', 'cum_sum_votes')\
    .orderBy(F.col('cum_sum_votes').desc_nulls_last())\
    .show(10)

Wrap-Up

This has been a whistle-stop tour of the main data manipulation techniques using Pyspark. Users of Pandas will feel familiar with this while SQL users will understand the semantics and be able to pick this up fairly easily.

0
Subscribe to my newsletter

Read articles from Cyril Ukwajiunor directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Cyril Ukwajiunor
Cyril Ukwajiunor