Enhancing your dbt project with Large Language Models


TL;DR

You can automatically solve typical Natural Language Processing tasks (classification, sentiment analysis, etc.) for your text data using LLM for as cheap as $10 per 1M rows (it depends on the task and the model), staying in your dbt environment. Instructions, details, and code are below


If you are using dbt as your transformation layer, you might have a situation when you want to extract meaningful info from unstructured text data. Such data might include customer reviews, titles, descriptions, Google Analytics sources/mediums, etc. You might want to categorize them into groups or fetch sentiments and tones.

Potential solutions would be

  • Apply machine learning models (or call an LLM) outside of the dbt flow

  • Define simple categorizations inside dbt models using CASE WHEN statements

  • Predefine categories upfront and either upload them into your raw database layer or leverage the dbt seed functionality

As Python dbt models are evolving, there is one more solution: you can keep these Natural Language Processing tasks inside your dbt environment as one of the dbt models.

If that might be helpful for you, see below a step-by-step guide on how to use OpenAI API in your dbt project. You can reproduce everything from this guide in your environment, having the code and data sample from the GitHub repository (see links at the end).

Set up environment

If you already have a dbt project and data or don't want to reproduce the results, jump to (4) or skip this section completely. Otherwise, you'll need the following:

  1. Set up the dbt project. Official docs

    1. You can simply clone the one I prepared for this guide from GitHub

    2. Don't forget to create/update your profiles.yml file

  2. Set up the database. I used Snowflake. Unfortunately, there is no free version, but they provide 30-day free trial, though

    1. Currently, dbt Python models work only with Snowflake, Databricks, and BigQuery (no PostgreSQL). So this tutorial should work for any of them, although some details might vary
  3. Prepare source data

    1. As a dataset, I used an R packages metadata published in the TidyTuesday repository

      1. You can download it from here. Details on the dataset are here

      2. Alternatively, you can use a lightweight version from my repository here

    2. Upload it to your database

    3. Update the source.yml file in the dbt project to match your database and schema names

  4. Get the OpenAI API key

    1. Follow quickstart instructions from official docs

    2. Note it's not free, but it is pay-as-you-go. So with the test 10-row dataset, you won't be charged more than $1 during your experiments

    3. To be extra careful, set a spending limit

  5. Set up External Access Integration in Snowflake

    1. This applies only if you use Snowflake

    2. If this is not done, dbt Python models cannot access any API on the internet (including OpenAI API)

    3. Follow official instructions

    4. Store OpenAI API key in this Integration

Come up with a list of categories

Firstly, if you are solving a classification task, you need categories (a.k.a. classes) to use in your LLM prompt. Basically, you will say: "I have a list of these categories, could you define to which one belongs this text?"

Some options here:

  1. Create a list of predefined categories manually

    1. It suits if you need stable and predictable categories

    2. Don't forget to add the "Others" here, so LLM would have this options when it's uncertain

    3. Ask LLM in your prompt to suggest a category name whenever it uses the "Others" category

    4. Upload a predefined list to the raw layer of the database or as a CSV in your dbt project (utilizing dbt seed )

  2. Feed a sample of your data to LLM and ask it to come up with N categories

    1. The same approach as the previous one, but we are getting help with the list

    2. If you use GPT, it's better to use seed here for reproducibility

  3. Go without predefined categories and let LLM do the work on the go

    1. This might lead to less predictable results

    2. At the same time, it is good enough if you are alright with a margin of randomness

    3. In the GPT use case, it's better to put temperature = 0 to avoid different results in case you need to rerun

In this blog post, I will go with the 3rd option.

Create a dbt Python model to call OpenAI API

Now let's go to the meat of this post and create a dbt model that will take new text data from the upstream table, feed it to the OpenAI API and save the category into the table.

As mentioned above, I'm going to use the R packages dataset. R is a highly popular programming language in data analysis. This dataset contains information about R packages from the CRAN project, such as version, license, author, title, description, etc. We are interested in the title field, as we are going to create a category for each package based on its title.

  1. Prepare the base for the model

    • dbt config can be passed via dbt.config(...) method

    • There are additional arguments in dbt.config, for example, packages is a package requirements

    • dbt Python model can reference upstream models dbt.ref('...') or dbt.source('...')

    • It must return a DataFrame. Your database will save it as a table

import os
import openai
import pandas as pd

COL_TO_CATEGORIZE = 'title'

def model(dbt, session):
    import _snowflake

    dbt.config(
        packages=['pandas', 'openai'],
        )

    df = dbt.ref('package').to_pandas()
    df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True)

    return df
  1. Connect to OpenAI API

    • We need to pass secrets and external_access_integrations to the dbt.config. It will contain the secret reference that stored in your Snowflake External Access Integration

    • Note this feature was released only several days ago and only available in the beta dbt version 1.8.0-b3

dbt.config(
    packages=['pandas', 'openai'],
    secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
    external_access_integrations=['openai_external_access_integration'],
    )

client = openai.OpenAI(
    api_key=_snowflake.get_generic_secret_string('openai_key'),
    organization=_snowflake.get_generic_secret_string('openai_org'),
    )
  1. Make dbt model incremental and turn off full refreshes

    • This part is essential to keep the OpenAI API costs low

    • It will prevent from categorizing the same text multiple time

    • Otherwise, you will send full data to OpenAI every time you execute dbt run, which can be several times a day

    • We are adding materialized='incremental' , incremental_strategy='append', full_refresh = False , to dbt.config

    • Now the full scan will be only for the first dbt run, and for the later runs (no matter incremental or full-refresh) it will categorize only delta

    • If you would like to be extra mindful, you can preprocess your data a little to reduce the number of unique entries, but avoid preprocessing too much as LLMs work better with natural language

dbt.config(
    materialized='incremental',
    incremental_strategy='append',
    full_refresh = False,
    packages=['pandas', 'openai'],
    secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
    external_access_integrations=['openai_external_access_integration'],
    )

if dbt.is_incremental:
    pass
  1. Add incrementality logic

    • On the incremental run (due to our setup up it means on any run except for the first one) we need to remove all already categorized titles

    • We can do it by simply using dbt.this. Similar to normal incremental models

if dbt.is_incremental:
    categorized_query = f'''
    SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this }
    WHERE "category" IS NOT NULL
    '''
    categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()]
    df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
  1. Call OpenAI API in batches

    • To reduce the costs, it's better to send data to OpenAI API in batches

    • The system prompt can be 5 times bigger than the text we need to classify. If we send the system prompt separately for each title, it will lead to much higher token usage for repetitive thing

    • Batch shouldn't be big, though. With big batches, GPT starts to produce less stable results. From my experiments, batch size = 5 works well enough

    • Additionally, to ensure the response does not exceed the relevant size I added max_tokens constraint

BATCH_SIZE = 5

n_rows = df.shape[0]
categories = [None for idx in range(n_rows)]
for idx in range(0, n_rows, BATCH_SIZE):
    df_sliced = df.iloc[idx:idx+BATCH_SIZE, :]
    user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```'

    chat_completion = client.chat.completions.create(
        messages=[
            {'role': 'system', 'content': SYSTEM_PROMPT},
            {'role': 'user', 'content': user_prompt}
        ],
        model='gpt-3.5-turbo',
        temperature=0,
        max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE,
    )
    gpt_response = chat_completion.choices[0].message.content
    gpt_response = [category.strip() for category in gpt_response.split('|')]
    categories[idx:idx + len(gpt_response)] = gpt_response
df['category'] = categories
df.dropna(subset=['category'], inplace=True)
  1. Time to talk about a prompt for LLM. That's what I got:

You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign.

  • Keep instruction straight to the point

  • Use the ``` technique to avoid SQL injections

  • Be clear on the result format. In my case, I asked for "|" as a separator for both inputs and outputs

  1. Final dbt model code
import os
import openai
import pandas as pd

SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets.
Titles will be separated by "|" sign.
Come up with a category for each title.
Return only category names separated by "|" sign.
'''
COL_TO_CATEGORIZE = 'title'
BATCH_SIZE = 5

def model(dbt, session):
    import _snowflake

    dbt.config(
        materialized='incremental',
        incremental_strategy='append',
        full_refresh = False,
        packages=['pandas', 'openai'],
        secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'},
        external_access_integrations=['openai_external_access_integration'],
        )

    client = openai.OpenAI(
        api_key=_snowflake.get_generic_secret_string('openai_key'),
        organization=_snowflake.get_generic_secret_string('openai_org'),
        )

    df = dbt.ref('package').to_pandas()
    df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True)
    if dbt.is_incremental:
        categorized_query = f'''
        SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this }
        WHERE "category" IS NOT NULL
        '''
        categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()]
        df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]

    n_rows = df.shape[0]
    categories = [None for idx in range(n_rows)]
    for idx in range(0, n_rows, BATCH_SIZE):
        df_sliced = df.iloc[idx:idx+BATCH_SIZE, :]
        user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```'

        chat_completion = client.chat.completions.create(
            messages=[
                {'role': 'system', 'content': SYSTEM_PROMPT},
                {'role': 'user', 'content': user_prompt}
            ],
            model='gpt-3.5-turbo',
            temperature=0,
            max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE,
        )
        gpt_response = chat_completion.choices[0].message.content
        gpt_response = [category.strip() for category in gpt_response.split('|')]
        categories[idx:idx + len(gpt_response)] = gpt_response
    df['category'] = categories
    df.dropna(subset=['category'], inplace=True)
    return df

Cost estimations

OpenAI API Pricing is listed here. They charge for the number of tokens requested and returned. The token is an instance correlated to a number of characters in your request. There are open-source packages to evaluate a number of tokens for a given text. For example, Tiktoken. If you want to evaluate it manually, the place to go is an official OpenAI tokenizer here.

In our dataset, there are ~18K titles. Roughly, it is equal to 320K input tokens (180K titles and 140K system prompt if we use batch size = 5) and 50K output tokens. Depending on the model, the costs for the full scan will be

  1. GPT-4 Turbo: $4.7. Pricing: input: $10 / 1M tokens; output: $30 / 1M tokens.

  2. GPT-4: $12.6. Pricing: input: $30 / 1M tokens; output: $60 / 1M tokens.

  3. GPT-3.5 Turbo: $0.2. Pricing: input: $0.5 / 1M tokens; output: $1.5 / 1M tokens.

Results

The dbt model worked like a charm. I successfully categorized all 18K packages without any gaps. The model proved to be cost-efficient and protected against multiple dbt runs.

I published the results dashboard to Tableau Public here. Feel free to play with it, download the data, and create whatever you desire on top of it.

Some interesting details I found:

  • Top-1 category is Data Visualization(1,190 packages, or 6%). I guess this proves the popularity of R as a visualization tool, especially with packages like Shiny, Plotly, and others.

  • The top two growing categories in 2023 were Data Import and Data Processing. Sounds like R started to be used more as a data-processing tool

  • The biggest year-to-year growth among the top 30 categories was Natural Language Processing in 2019. Two years after the famous paper "Attention Is All You Need" and half a year after the GPT-1 release :)

Further ideas

  1. We can use an alternative approach — use GPT embeddings

    • It's much cheaper

    • But more engineering-heavy because you should do the classification part on your own (stay tuned, as I'm going to explore this option in one of the next posts)

  2. Certainly, it makes sense to remove this part from dbt and push it to cloud functions or whatever infra you use. At the same time, if you'd like to keep it under the dbt — this post gets you covered

  3. Avoid adding any logic to the model. It should do one job — call LLM and save the result. This will help you to stay away from re-running it

  4. Chances are high that you are using many environments in your dbt project. You need to be mindful and avoid running this model over and over again in each developer environment on each Pull Request

    • To do this, you can incorporate logic with if dbt.config.get("target_name") == 'dev'
  5. Response with a delimiter can be unstable

    • For example, GPT can return fewer elements than you expected, and it will be hard to map initial titles to the list of categories

    • To overcome this, add response_format={ "type": "json_object" } in your request to require JSON output. See official docs

    • With JSON output, you can ask in prompt to provide an answer in the format {"title": "category"}, and then map it to your initial values

    • Note it will be more expensive, as it will increase response size

    • Strangely enough, the quality of classification dropped dramatically when I switched to JSON for GPT 3.5 Turbo

  6. There is an alternative in Snowflake — using cortex.complete() function. Check out a great post by Joel Labes on the dbt blog

That's it! Let me know what you think.

Full code on GitHub: link

Tableau Public dashboard: link

TidyTuesday R dataset: link

0
Subscribe to my newsletter

Read articles from Kliment Merzlyakov directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Kliment Merzlyakov
Kliment Merzlyakov

I am Kliment Merzlyakov, a Data Analytics Engineer with over 10 years of experience as a data practitioner. I am happy to chat about tools, ideas, and methods related to the modern data stack. If we are in the same boat, message me on LinkedIn. Follow for more tips and tricks around the modern data stack: No bs Real-world open data Fully transparent open-source code, allowing you to replicate the concepts in your project