Using dlt to Improve Data Workflows

Context

In a recent project for Thameslink UK, I designed and implemented an end-to-end automated data pipeline for tracking and analyzing customer-reported feedback on railway service failures and delays. Azure Serverless Functions were used to ingest data from Twitter and apply sentiment analysis models to extract key insights.

The results were stored in an Azure Serverless SQL database and visualized using interactive dashboards on Power BI. You can have a look at the project report.

Figure 1. Existing System Design

Figure 1. Existing System Design

Benefits of using dlt

While the project was successful, I could have benefited from using dlt to simplify and streamline the data-loading process.

Figure 2. Revised System Design with dlt

Incremental Data Loading

I used a try-except block to handle errors caused by duplicate insertions in the database.

for index, row in df.iterrows():
    try:
        cursor.execute("INSERT INTO dbo.tweet(id,tweet_timestamp,
        Tweet_text,sentiment,topic, tweet_time, Tweet_date,
        tweet_day,tweet_hour) values (?,?,?,?,?,?,?,?,?)",                                       
       
Row.id,row.time, row.text, row.sentiment,row.topic,
        row.Time,
row.Date,row.Day,row.Hour)
    except SQLdb.IntegrityError as e:
        logging.warn(e)

dlt supports incremental loading, which avoids duplicates in the database.

I could use the merge write disposition method which allows for the deduplication of data at the time of loading by checking for duplicates and updating them if necessary, ensuring that only unique records are loaded into the database.

For example:

@dlt.resource(primary_key="tweet_id", write_disposition="merge")

Loading Data from multiple sources

To load data from multiple sources, I would need to write data fetching functions for each data source, and then manage the coordination of data ingestion and processing across those functions.

With dlt, I could define a dlt resource for each data source and specify the method to fetch the data. I could also specify the transformation steps required for each data source and finally, use the merge write disposition to consolidate the data and load to the final destination.

Faster Setup

I spent a considerable amount of time manually configuring the pipeline. dlt provides a framework with templates for setting up the necessary configuration files, including the credentials file for accessing data sources and infrastructure setup.

This would make it easier to get started with data loading without having to worry about these details and boilerplate code.

Intuitive transformation and loading

I designed the database schemas and also set the data type for every column in the tables manually.

Using dlt would have saved me effort since it provides a simple and intuitive way to normalize and transform data during the ETL process. The run function could be used to automatically generate destination tables and specify the data types of the columns.

Challenges

Using dlt has its challenges and limitations. It has a steep learning curve, especially for beginners who are not familiar with data engineering concepts such as ETL, pipelines, and incremental loading.

My experience using dlt

I tried to recreate a part of my project using dlt.

Here is a code snippet for loading data from the Twitter API and loading it into a DuckDB.

# import libraries
import dlt
import tweepy
import toml

# Load the secrets file
secrets = toml.load("dlt/secrets.toml")

# Access the keys
consumer_key = secrets["consumer_key"]
consumer_secret = secrets["consumer_secret"]
access_key = secrets["access_key"]
access_secret = secrets["access_secret"]

@dlt.source (name="twitter_data")
def twitter_source(consumer_key, consumer_secret, access_key, access_secret):
    return twitter_resource(consumer_key, consumer_secret, access_key, access_secret)

@dlt.resource(write_disposition="append")
def twitter_resource(consumer_key, consumer_secret, access_key, access_secret):
    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_key, access_secret)
    api = tweepy.API(auth)
    tweets = tweepy.Cursor(
api.search_tweets, q='python', lang='en').items(10)
    for tweet in tweets:
        yield tweet._json

if name=='main':
    # configure the pipeline with your destination details
    pipeline = dlt.pipeline(pipeline_name='twitter', destination='duckdb',                                         dataset_name='twitter_data')
    data = list(twitter_resource(consumer_key, consumer_secret, access_key,  access_secret))
    # run the pipeline with your parameters
    load_info =
pipeline.run([data], table_name='tweets')

I encountered some problems when attempting to access the secrets file using the boilerplate code. Additionally, the code from the user manual that was meant to load the information did not work. Despite my attempts to debug the issue with the built-in run function, the error persisted, which led me to completely change the code.

However, apart from these issues, setting up the system was straightforward, and I was pleasantly surprised at how effortless it was to view all the loaded data and multiple tables via a SteamLit app on my local browser and the ability to query the data using SQL Queries.

0
Subscribe to my newsletter

Read articles from Rashmi Carol Dsouza directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Rashmi Carol Dsouza
Rashmi Carol Dsouza