Quick Tip : Validate runMultiple DAG In Fabric

Sandeep PawarSandeep Pawar
2 min read

First, if you haven't noticed mssparkutils has been officially renamed to notebookutils. Check out the official documentation for details. Be sure to use/update your notebooks to notebookutils.

I have written about runMultiple before. It allows you to run multiple notebooks in parallel with a defined orchestration pattern including dependencies. notebookutils now also has .validateDAG method to check if the DAG has been defined per the expected JSON structure. It can be helpful check before executing runMultiple.

Example:

I will use the same DAG I used in my previous blog.

DAG = {
    "activities": [
        {
            "name": "extract_customers", 
            "path": "extract_customers", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 1000},
        },
        {
            "name": "extract_products", 
            "path": "extract_products", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 5000},
        },
        {
            "name": "extract_offers", 
            "path": "extract_offers", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 1000},
        },
        {
            "name": "extract_leads", 
            "path": "extract_leads", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 100000},
        },
        {
            "name": "customer_table",
            "path": "customer_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["extract_customers"]
        },
        {
            "name": "products_table",
            "path": "products_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["extract_products"]
        },
                {
            "name": "leads_table",
            "path": "leads_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["extract_leads","customer_table", "products_table"]
        },
           {
            "name": "offers_table",
            "path": "offers_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["extract_offers","customer_table", "products_table"]
        },
                   {
            "name": "refresh_dataset",
            "path": "refresh_dataset",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["customer_table","products_table","leads_table","offers_table"]
        }

    ],
    "timeoutInSeconds": 3600, # max 1 hour for the entire pipeline
    "concurrency": 5 # max 5 notebooks in parallel
}

notebookutils.notebook.validateDAG(DAG)
#Output True

If I add a dependency that doesn't exist, validation will fail.

INVALID_DAG = {
    "activities": [
        {
            "name": "extract_customers", 
            "path": "extract_customers", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 1000},
        },
        {
            "name": "extract_products", 
            "path": "extract_products", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 5000},
        },
        {
            "name": "extract_offers", 
            "path": "extract_offers", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 1000},
        },
        {
            "name": "extract_leads", 
            "path": "extract_leads", 
            "timeoutPerCellInSeconds": 120,
            "args": {"rows": 100000},
        },
        {
            "name": "customer_table",
            "path": "customer_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["THIS_NOTEBOOK_DOES_NOT_EXIST"] ###INVALID
        },
        {
            "name": "products_table",
            "path": "products_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["extract_products"]
        },
                {
            "name": "leads_table",
            "path": "leads_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["extract_leads","customer_table", "products_table"]
        },
           {
            "name": "offers_table",
            "path": "offers_table",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["extract_offers","customer_table", "products_table"]
        },
                   {
            "name": "refresh_dataset",
            "path": "refresh_dataset",
            "timeoutPerCellInSeconds": 90,
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["customer_table","products_table","leads_table","offers_table"]
        }

    ],
    "timeoutInSeconds": 3600, # max 1 hour for the entire pipeline
    "concurrency": 5 # max 5 notebooks in parallel
}

notebookutils.notebook.validateDAG(INVALID_DAG)
#Returns error

Note that validation is not exhaustive. For example, you could enter concurrency as -5 which is invalid as it has to be a positive number but validateDAG will not flag it as an error. But this is still very handy.

2
Subscribe to my newsletter

Read articles from Sandeep Pawar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sandeep Pawar
Sandeep Pawar