Enhancing your dbt project with Large Language Models
TL;DR
You can automatically solve typical Natural Language Processing tasks (classification, sentiment analysis, etc.) for your text data using LLM for as cheap as $10 per 1M rows (it depends on the task and the model), staying in your dbt environment. Instructions, details, and code are below
If you are using dbt as your transformation layer, you might have a situation when you want to extract meaningful info from unstructured text data. Such data might include customer reviews, titles, descriptions, Google Analytics sources/mediums, etc. You might want to categorize them into groups or fetch sentiments and tones.
Potential solutions would be
Apply machine learning models (or call an LLM) outside of the dbt flow
Define simple categorizations inside dbt models using CASE WHEN statements
Predefine categories upfront and either upload them into your raw database layer or leverage the dbt seed functionality
As Python dbt models are evolving, there is one more solution: you can keep these Natural Language Processing tasks inside your dbt environment as one of the dbt models.
If that might be helpful for you, see below a step-by-step guide on how to use OpenAI API in your dbt project. You can reproduce everything from this guide in your environment, having the code and data sample from the GitHub repository (see links at the end).
Set up environment
If you already have a dbt project and data or don't want to reproduce the results, jump to (4) or skip this section completely. Otherwise, you'll need the following:
Set up the dbt project. Official docs
You can simply clone the one I prepared for this guide from GitHub
Don't forget to create/update your profiles.yml file
Set up the database. I used Snowflake. Unfortunately, there is no free version, but they provide 30-day free trial, though
- Currently, dbt Python models work only with Snowflake, Databricks, and BigQuery (no PostgreSQL). So this tutorial should work for any of them, although some details might vary
Prepare source data
As a dataset, I used an R packages metadata published in the TidyTuesday repository
Upload it to your database
Update the
source.yml
file in the dbt project to match your database and schema names
Get the OpenAI API key
Follow quickstart instructions from official docs
Note it's not free, but it is pay-as-you-go. So with the test 10-row dataset, you won't be charged more than $1 during your experiments
To be extra careful, set a spending limit
Set up External Access Integration in Snowflake
This applies only if you use Snowflake
If this is not done, dbt Python models cannot access any API on the internet (including OpenAI API)
Follow official instructions
Store OpenAI API key in this Integration
Come up with a list of categories
Firstly, if you are solving a classification task, you need categories (a.k.a. classes) to use in your LLM prompt. Basically, you will say: "I have a list of these categories, could you define to which one belongs this text?"
Some options here:
Create a list of predefined categories manually
It suits if you need stable and predictable categories
Don't forget to add the "Others" here, so LLM would have this options when it's uncertain
Ask LLM in your prompt to suggest a category name whenever it uses the "Others" category
Upload a predefined list to the raw layer of the database or as a CSV in your dbt project (utilizing
dbt seed
)
Feed a sample of your data to LLM and ask it to come up with N categories
The same approach as the previous one, but we are getting help with the list
If you use GPT, it's better to use seed here for reproducibility
Go without predefined categories and let LLM do the work on the go
This might lead to less predictable results
At the same time, it is good enough if you are alright with a margin of randomness
In the GPT use case, it's better to put temperature = 0 to avoid different results in case you need to rerun
In this blog post, I will go with the 3rd option.
Create a dbt Python model to call OpenAI API
Now let's go to the meat of this post and create a dbt model that will take new text data from the upstream table, feed it to the OpenAI API and save the category into the table.
As mentioned above, I'm going to use the R packages dataset. R is a highly popular programming language in data analysis. This dataset contains information about R packages from the CRAN project, such as version, license, author, title, description, etc. We are interested in the title
field, as we are going to create a category for each package based on its title.
Prepare the base for the model
dbt config can be passed via
dbt.config(...)
methodThere are additional arguments in dbt.config, for example,
packages
is a package requirementsdbt Python model can reference upstream models
dbt.ref('...')
ordbt.source('...')
It must return a DataFrame. Your database will save it as a table
import os
import openai
import pandas as pd
COL_TO_CATEGORIZE = 'title'
def model(dbt, session):
import _snowflake
dbt.config(
packages=['pandas', 'openai'],
)
df = dbt.ref('package').to_pandas()
df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True)
return df
Connect to OpenAI API
We need to pass
secrets
andexternal_access_integrations
to the dbt.config. It will contain the secret reference that stored in your Snowflake External Access IntegrationNote this feature was released only several days ago and only available in the beta dbt version 1.8.0-b3
dbt.config(
packages=['pandas', 'openai'],
secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
external_access_integrations=['openai_external_access_integration'],
)
client = openai.OpenAI(
api_key=_snowflake.get_generic_secret_string('openai_key'),
organization=_snowflake.get_generic_secret_string('openai_org'),
)
Make dbt model incremental and turn off full refreshes
This part is essential to keep the OpenAI API costs low
It will prevent from categorizing the same text multiple time
Otherwise, you will send full data to OpenAI every time you execute
dbt run
, which can be several times a dayWe are adding
materialized='incremental'
,incremental_strategy='append'
,full_refresh = False
, to dbt.configNow the full scan will be only for the first dbt run, and for the later runs (no matter incremental or full-refresh) it will categorize only delta
If you would like to be extra mindful, you can preprocess your data a little to reduce the number of unique entries, but avoid preprocessing too much as LLMs work better with natural language
dbt.config(
materialized='incremental',
incremental_strategy='append',
full_refresh = False,
packages=['pandas', 'openai'],
secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
external_access_integrations=['openai_external_access_integration'],
)
if dbt.is_incremental:
pass
Add incrementality logic
On the incremental run (due to our setup up it means on any run except for the first one) we need to remove all already categorized titles
We can do it by simply using
dbt.this
. Similar to normal incremental models
if dbt.is_incremental:
categorized_query = f'''
SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this }
WHERE "category" IS NOT NULL
'''
categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()]
df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
Call OpenAI API in batches
To reduce the costs, it's better to send data to OpenAI API in batches
The system prompt can be 5 times bigger than the text we need to classify. If we send the system prompt separately for each title, it will lead to much higher token usage for repetitive thing
Batch shouldn't be big, though. With big batches, GPT starts to produce less stable results. From my experiments, batch size = 5 works well enough
Additionally, to ensure the response does not exceed the relevant size I added
max_tokens
constraint
BATCH_SIZE = 5
n_rows = df.shape[0]
categories = [None for idx in range(n_rows)]
for idx in range(0, n_rows, BATCH_SIZE):
df_sliced = df.iloc[idx:idx+BATCH_SIZE, :]
user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```'
chat_completion = client.chat.completions.create(
messages=[
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt}
],
model='gpt-3.5-turbo',
temperature=0,
max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE,
)
gpt_response = chat_completion.choices[0].message.content
gpt_response = [category.strip() for category in gpt_response.split('|')]
categories[idx:idx + len(gpt_response)] = gpt_response
df['category'] = categories
df.dropna(subset=['category'], inplace=True)
- Time to talk about a prompt for LLM. That's what I got:
You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign.
Keep instruction straight to the point
Use the ``` technique to avoid SQL injections
Be clear on the result format. In my case, I asked for "|" as a separator for both inputs and outputs
- Final dbt model code
import os
import openai
import pandas as pd
SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets.
Titles will be separated by "|" sign.
Come up with a category for each title.
Return only category names separated by "|" sign.
'''
COL_TO_CATEGORIZE = 'title'
BATCH_SIZE = 5
def model(dbt, session):
import _snowflake
dbt.config(
materialized='incremental',
incremental_strategy='append',
full_refresh = False,
packages=['pandas', 'openai'],
secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
external_access_integrations=['openai_external_access_integration'],
)
client = openai.OpenAI(
api_key=_snowflake.get_generic_secret_string('openai_key'),
organization=_snowflake.get_generic_secret_string('openai_org'),
)
df = dbt.ref('package').to_pandas()
df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True)
if dbt.is_incremental:
categorized_query = f'''
SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this }
WHERE "category" IS NOT NULL
'''
categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()]
df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
n_rows = df.shape[0]
categories = [None for idx in range(n_rows)]
for idx in range(0, n_rows, BATCH_SIZE):
df_sliced = df.iloc[idx:idx+BATCH_SIZE, :]
user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```'
chat_completion = client.chat.completions.create(
messages=[
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt}
],
model='gpt-3.5-turbo',
temperature=0,
max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE,
)
gpt_response = chat_completion.choices[0].message.content
gpt_response = [category.strip() for category in gpt_response.split('|')]
categories[idx:idx + len(gpt_response)] = gpt_response
df['category'] = categories
df.dropna(subset=['category'], inplace=True)
return df
Cost estimations
OpenAI API Pricing is listed here. They charge for the number of tokens requested and returned. The token is an instance correlated to a number of characters in your request. There are open-source packages to evaluate a number of tokens for a given text. For example, Tiktoken. If you want to evaluate it manually, the place to go is an official OpenAI tokenizer here.
In our dataset, there are ~18K titles. Roughly, it is equal to 320K input tokens (180K titles and 140K system prompt if we use batch size = 5) and 50K output tokens. Depending on the model, the costs for the full scan will be
GPT-4 Turbo
: $4.7. Pricing: input: $10 / 1M tokens; output: $30 / 1M tokens.GPT-4
: $12.6. Pricing: input: $30 / 1M tokens; output: $60 / 1M tokens.GPT-3.5 Turbo
: $0.2. Pricing: input: $0.5 / 1M tokens; output: $1.5 / 1M tokens.
Results
The dbt model worked like a charm. I successfully categorized all 18K packages without any gaps. The model proved to be cost-efficient and protected against multiple dbt runs.
I published the results dashboard to Tableau Public here. Feel free to play with it, download the data, and create whatever you desire on top of it.
Some interesting details I found:
Top-1 category is
Data Visualization
(1,190 packages, or 6%). I guess this proves the popularity of R as a visualization tool, especially with packages like Shiny, Plotly, and others.The top two growing categories in 2023 were
Data Import
andData Processing
. Sounds like R started to be used more as a data-processing toolThe biggest year-to-year growth among the top 30 categories was
Natural Language Processing
in 2019. Two years after the famous paper "Attention Is All You Need" and half a year after the GPT-1 release :)
Further ideas
We can use an alternative approach — use GPT embeddings
It's much cheaper
But more engineering-heavy because you should do the classification part on your own (stay tuned, as I'm going to explore this option in one of the next posts)
Certainly, it makes sense to remove this part from dbt and push it to cloud functions or whatever infra you use. At the same time, if you'd like to keep it under the dbt — this post gets you covered
Avoid adding any logic to the model. It should do one job — call LLM and save the result. This will help you to stay away from re-running it
Chances are high that you are using many environments in your dbt project. You need to be mindful and avoid running this model over and over again in each developer environment on each Pull Request
- To do this, you can incorporate logic with
if dbt.config.get("target_name") == 'dev'
- To do this, you can incorporate logic with
Response with a delimiter can be unstable
For example, GPT can return fewer elements than you expected, and it will be hard to map initial titles to the list of categories
To overcome this, add
response_format={ "type": "json_object" }
in your request to require JSON output. See official docsWith JSON output, you can ask in prompt to provide an answer in the format {"title": "category"}, and then map it to your initial values
Note it will be more expensive, as it will increase response size
Strangely enough, the quality of classification dropped dramatically when I switched to JSON for GPT 3.5 Turbo
There is an alternative in Snowflake — using cortex.complete() function. Check out a great post by Joel Labes on the dbt blog
That's it! Let me know what you think.
Links
Full code on GitHub: link
Tableau Public dashboard: link
TidyTuesday R dataset: link
Subscribe to my newsletter
Read articles from Kliment Merzlyakov directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Kliment Merzlyakov
Kliment Merzlyakov
I am Kliment Merzlyakov, a Data Analytics Engineer with over 10 years of experience as a data practitioner. I am happy to chat about tools, ideas, and methods related to the modern data stack. If we are in the same boat, message me on LinkedIn. Follow for more tips and tricks around the modern data stack: No bs Real-world open data Fully transparent open-source code, allowing you to replicate the concepts in your project