How to Supercharge Your Streaming Data Pipeline in Python


Streaming data processing has come a long way, so why stick to old methods and not use modern practices. Let me share my fresh perspective that can help you solve your problem.
Inspiration from Batch Processing
Batch Processing shines with below (though not limited) general use cases
Transforming data in ETL/ELT data pipelines.
Performing aggregation, grouping, filtering, joins, analytics, etc, the list is never ending.
Doing all kinds of operations on table data like append, merge, delete, update, etc.
This side of the world is pretty mature now. We have a very good set of tools, frameworks, etc. that allows us to develop the pipeline. So why not apply the same functionality on a streaming pipeline but at the same time not lose its real-time processing character.
Problem Statement
Track customers' journeys on an e-commerce website. The events include which products a customer viewed, which products were added to the cart, and finally, which products were purchased. Calculate metrics that can be used for analysis.
I am using Kafka
as my streaming platform. In Python, two popular client libraries can be used to interact with Kafka brokers
Here is my alternative take - neither of them should be used directly.
Neither of them has Python’s
Typed
support, so no IDE can work efficiently.Both need a fair amount of boilerplate code
Their scope of work is limited to just
Producer
,Consumer
&Admin
.To apply principles of Batch processing, we have to write a lot of custom code.
You need not worry, there are open-source libraries like Pathway and Quix Streams that can be used for stream processing. Let’s quickly compare them
Pathway has many more GitHub stars compared to Quix Streams.
Pathway also supports a lot more connectors and sinks than the other.
Both allow us to do batch processing tasks like filters, aggregation, joins, analytics, and Python UDF on streaming data.
Pathway's strength in supporting many connectors and sinks is also its weakness, as it makes the library quite large. Quix Streams is designed for one main purpose: Streaming.
The biggest advantage of Quix Streams is that it lets us use low-level Kafka client tools with full
Typed
support, allowing us to use IDEs efficiently. This was the main reason I chose Quix Streams.
The Producer:
Below, I am generating Fake data for a Customer journey & sending the events into their respective topics.
import logging
from quixstreams import Application
from src.ops.generator import generate_dummy_e_commerce_data
logger = logging.getLogger("data-pipeline")
app = Application(
broker_address="localhost:9092",
loglevel="DEBUG",
producer_extra_config={
"linger.ms": "300", # Wait up to 300ms for more messages before sending
"compression.type": "gzip", # Use gzip compression for messages
},
)
product_view_topic = app.topic(
"ecomm_product_view", value_serializer="json", key_serializer="string"
)
# Creating the kafka Topic client
cart_topic = app.topic("ecomm_cart", value_serializer="json", key_serializer="string")
buy_topic = app.topic("ecomm_buy", value_serializer="json", key_serializer="string")
# quixstreams' Application allow us to use context manager so we won't need to worry
# about closing connection gracefully
with app.get_producer() as producer:
for _ in range(200):
logger.info(f"current iteration: {_}")
# generate dummy data
event = generate_dummy_e_commerce_data()
# sending the event to respective topics
# Product view event
product_view_msg = product_view_topic.serialize(
key=event["product_view"].user_id,
value=event["product_view"].model_dump(mode="json"),
)
producer.produce(
product_view_topic.name,
value=product_view_msg.value,
key=product_view_msg.key,
)
logger.debug(f"producing product view event: {event['product_view'].event_id}")
# Add to cart event
# NOTE - It is possible that the add_to_cart and purchase events are None
if event["add_to_cart"]:
cart_msg = cart_topic.serialize(
key=event["add_to_cart"].user_id,
value=event["add_to_cart"].model_dump(mode="json"),
)
producer.produce(cart_topic.name, value=cart_msg.value, key=cart_msg.key)
logger.debug(
f"producing product add to cart event: {event['add_to_cart'].event_id}"
)
if event["purchase"]:
buy_msg = buy_topic.serialize(
key=event["purchase"].user_id,
value=event["purchase"].model_dump(mode="json"),
)
producer.produce(buy_topic.name, value=buy_msg.value, key=buy_msg.key)
logger.debug(f"producing buy event: {event['purchase'].event_id}")
Important Point:
The default settings for Producer are good, but to extract more performance, you’ll need to fine-tune them. I usually play around following the producer config setting:
compression.type
: To compress messages. I usually usegzip
. It has wide support.linger.ms
: Duration to wait for more gathering messages before sending. It should depend on the frequency of incoming messages.
It’s recommended to use the context manager of
quixstreams
app object, so that towards the end connections will be closed gracefully.
The Consumer: Part A
Consuming messages from three topics —> Perform data processing —> Perform join to calculate customer journey —> Publish results to 4th topic.
from quixstreams import Application
from src.ops.transform import convert_utc_to_ist
app = Application(
broker_address="localhost:9092",
consumer_group="ecomm_sync_group",
auto_offset_reset="earliest",
consumer_extra_config={
"auto.offset.reset": "earliest", # Start reading from the earliest message
"enable.auto.commit": "true", # Automatically commit offsets
},
loglevel="DEBUG",
)
# add topics to consume
product_view_topic = app.topic(
"ecomm_product_view", value_serializer="json", key_serializer="string"
)
cart_topic = app.topic("ecomm_cart", value_deserializer="json", key_serializer="string")
buy_topic = app.topic("ecomm_buy", value_deserializer="json", key_serializer="string")
# Output topic for customer journey
customer_journey_topic = app.topic(
"ecomm_customer_journey",
value_serializer="json",
)
# Streaming dataframe consumers
product_view_sdf = app.dataframe(product_view_topic)
cart_sdf = app.dataframe(cart_topic)
buy_sdf = app.dataframe(buy_topic)
# Stream processing
# Product view topic
product_view_sdf["timestamp"] = product_view_sdf["timestamp"].apply(convert_utc_to_ist)
# Add to cart topic
cart_sdf["price"] = cart_sdf["price"].apply(lambda x: round(x, 2))
cart_sdf["timestamp"] = cart_sdf["timestamp"].apply(convert_utc_to_ist)
# Join product view and cart dataframes on user_id
joined_view_cart = cart_sdf.join_asof(
right=product_view_sdf, how="left", on_merge="keep-left"
)
# Buy product topic
buy_sdf["price"] = buy_sdf["price"].apply(lambda x: round(x, 2))
buy_sdf["timestamp"] = buy_sdf["timestamp"].apply(convert_utc_to_ist)
joined_buy = buy_sdf.join_asof(right=joined_view_cart, how="left", on_merge="keep-left")
joined_buy.to_topic(customer_journey_topic)
# Starting the app to process streams real-time
app.run()
Important Points:
Same as
Producer
, the default settings forConsumer
are good, but to extract more performance, you’ll need to fine-tune them. I usually play around following the producer config setting:Quix streams provides us with
StreamingDataFrame
interface to apply all kinds of transformation & analytical logic that we want to apply, including PythonUDF
.I am performing a
Stateful Join
on twoStreamingDataFrame
. It usesRocksDB
to maintain the State with flushing data. I will highly recommend reading more on it hereAfter a couple of
Join
Operation, I am sending data as an event to yet another topic. This will only get triggered when the join operation finds a key to join. In this way, I can track the entire journey of the customer from product view to add to cart to finally buy.
The Consumer: Part B
Consuming messages using a low-level client library API to perform custom sink logic.
import logging
from pathlib import Path
import orjson
import polars as pl
from quixstreams import Application
from src.connector import DataWriter
logger = logging.getLogger("data-pipeline")
merge_option = DataWriter.generate_delta_table_merge_method_options(
when_not_matched_insert_all=True, when_matched_update_all=True
)
app = Application(
broker_address="localhost:9092",
consumer_group="ecomm_customer_report_group",
auto_offset_reset="earliest",
consumer_extra_config={
"enable.auto.commit": True, # Automatically commit offsets
},
loglevel="DEBUG",
)
with app.get_consumer() as consumer:
consumer.subscribe(topics=["ecomm_customer_journey"])
# Starting the 'Forever consuming consumer'
while True:
message = consumer.poll(0.5)
if message is None:
continue
elif message.error():
logger.error("Kafka error:", message.error())
continue
value = message.value()
# Merge data into Delta lake table
df = pl.from_dict(orjson.loads(value))
merge_stats = DataWriter.delta_table_merge_disk(
df=df,
path=Path(__file__).parent.parent.parent
/ "data/gold/ecomm_customer_journey",
delta_merge_options={
"predicate": "source.event_id = target.event_id", # condition to determine upsert req
"source_alias": "source",
"target_alias": "target",
},
delta_merge_method_options=merge_option,
)
logger.info(f"merge successfully with stats: {merge_stats}")
consumer.store_offsets(message=message)
Important Points:
This is the second way to get messages from Kafka Topics. It requires more code but gives us flexibility in processing. However, it's not the main recommendation; using
StreamingDataFrame
is preferred.I needed to do a
Delta Merge
on the Delta Lake table, which thequixstreams
library doesn't support directly. So, I used the low-level client API to achieve it.I like the
quixstreams
library because it offers a high-level API -StreamingDataFrame
, for batch-like processing on streaming data. But if that's not an option, it also provides a low-level Kafka client API, allowing us to do almost anything.
append
& no delta merge
. Also, it does not expose any direct client API.Note: I have open-sourced my project & you can find all the source code here - Akashdesarda/data-pipeline-app-demo
Conclusion:
Supercharging your streaming data pipeline in Python involves using modern tools and practices to boost efficiency and performance.
Draw inspiration from batch processing to apply similar principles to streaming data without losing real-time capabilities.
Use advanced libraries like Quix Streams for efficient data processing.
Features like
StreamingDataFrame
these allow for high-level operations.Low-level client APIs are available for custom tasks.
This approach simplifies development and provides flexibility and scalability.
It enables effective tracking and analysis of customer journeys in real-time.
Embracing these modern techniques ensures your data pipeline is robust, efficient, and meets the demands of today's data-driven environments.
Subscribe to my newsletter
Read articles from Akash Desarda directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
