Snowflake Streams and Tasks: Best Practices

John RyanJohn Ryan
13 min read

As a Data Engineer, it’s vital to understand the techniques available for Change Data Capture (CDC). These methods allow you to quickly identify whether incoming data has changed and whether you need to process the new/modified data.

Snowflake Streams are a straightforward built-in method for capturing data changes, while Tasks automate the execution of SQL workflows. Together, they help you build efficient, scalable, and robust pipelines.

This guide will explore Streams and Tasks, their workings, and their practical use. It will include example code and real-world use cases. Unique to this article, I’ll also describe how they work under the hood. Finally, we’ll discuss best practices for using them.

Prefer to Watch a Video?

What problem are we trying to solve?

The diagram below illustrates a simple ELT pipeline. It shows files loaded using Snowpipe to a Staging Table before a batch ETL process aggregates the data to deliver Target results for end-user queries.

Although simple, this includes a few additional problems:

  • Identifying Changes: As new files are loaded to the Staging Table, they are automatically appended to existing rows. We, therefore, need to track which rows we’ve already processed to ensure we don’t end up with duplicate values in the Target Table.

  • Scheduling: Assuming this is a batch process, we must regularly schedule the overall process. This could run once per minute, per hour, or day.

  • Avoid Waste: While we expect lots of new rows during peak hours, we can expect little or no activity at 4 a.m. Ideally, we should do nothing if there are no rows to process. Likewise, if there are few rows to process, we want to avoid the 60-second minimum charge for a Virtual Warehouse when only a few rows take 1-2 seconds. In context, a single ELT job running every five minutes in a medium-sized warehouse would waste up to $20,000 a year if there was little or no work.

As you can see, a simple challenge includes several underlying problems. How can Streams and Tasks help here?

ELT Pipelines with Streams and Tasks

The diagram below shows how, using two simple constructs, Streams and Tasks, we can efficiently solve the problems above.

The above solution includes the following components:

  • A Stream: While acting like a View over a table, it returns the changes made. In this case, it only returns the new rows added to the table since they were last processed. There is no need to track timestamps or record which rows were processed - Snowflake handles that.

  • A Task provides a simple mechanism for scheduling a regular process (e.g., a Stored Procedure or a sequence of SQL statements) to process the rows.

Best of all, the Task can be run as a “Serverless Process,” which means you don’t need to deploy a Virtual Warehouse. Billing is per-CPU-per-second, so avoid the 60-second minimum AUTO-SUSPEND time in a warehouse. Equally, if the task runs without rows to process, there’s zero cost.

Finally, serverless tasks are executed at a 10% discount compared to virtual warehouses, reducing execution costs.

In summary, it’s a straightforward way to identify and process new entries regularly. It’s also remarkably cost-effective compared to scheduling a set of SQL scripts or a Stored Procedure in a virtual warehouse.

How do I create a stream?

The following SQL shows how to create a Snowflake Stream on the SALES table.

create stream sales_stream
   on table sales;

To query from the sales stream execute:

select *
from sales_stream;

Rather than return all the SALES rows, this returns only the rows either Inserted, updated, or deleted since the last time the data was processed.

How do Streams Work under the hood?

The diagram below illustrates the difference between a STREAM and the underlying TABLE. Like a view, the Stream doesn’t store data but includes a pointer to the table micro-partitions. Initially, this is at the beginning of the table.

If you were to query the Stream (rather than the table), in addition to the data, you’d see additional columns including automatically generated meta-data as illustrated below:

In addition to the CUSTOMER rows, you’d see additional virtual columns unique to the Stream. In the above example the METADATA$ACTION indicates these rows were inserted and not yet processed.

As standard, you get the following additional columns:

  • METADATA$ACTION: Indicates whether the action against the row was an INSERT or DELETE operation. Note: Updates are handled as a two-step operation: a DELETE of the existing row and an INSERT of the new version, which means updates return two rows from a stream.

  • METADATA$ISUPDATE: Indicates whether the operation was part of an update operation. If this isFALSE, it can be ignored, but ifTRUEthe outcome depends upon METADATA$ACTION. A value of DELETE indicates this row is the before image of an Update operation, and a value of INSERT indicates the after image of an update.

  • METADATA$ROWID: This is a unique generated identifier for each row.

The diagram below illustrates the potential values:

An example using a Stream

Streams can, therefore, be used to identify when data has been changed by either INSERT, UPDATE or DELETE operations including TRUNCATE and MERGE statements. Furthermore, a single table can have multiple streams, and separate ELT processes can use each to deal with the same data independently, as each is tracked separately.

However, the real power of streams is how we can use them in SELECT statements to filter entries. For example:

select *
from customer_stream
where metadata$action = 'INSERT'
and   metadata$isupdate = FALSE;

The above statement fetches only the rows Inserted since the last time the data was processed, UPDATE or DELETE operations are ignored.

select * 
from customer_stream 
where metadata$action = 'DELETE' 
and   metadata$isupdate = TRUE;

Equally, the above SQL statement returns the before image values of any rows that were UPDATED. This could be incredibly useful when trying to convert a table of CUSTOMER rows into an SCD2 table where these entries would be converted into an UPDATE statement to close off existing rows.

How do I create a task?

The SQL below shows how to create a simple task that is executed every five minutes.

create or replace task elt_process_sales
   schedule = '5 MINUTE'
   when system$stream_has_data('SALES_STREAM')
as
   insert into processed_sales
   select *
   from sales;

Notice a few points about this task:

  1. No Virtual Warehouse: Although you could indicate a virtual warehouse to execute this on, running this as a serverless process means Snowflake will bill per-second-per-CPU-core rather than by warehouse size. It also bills at 90% of the warehouse cost, making it even more efficient.

  2. No data - no problem: Because this task includes a reference to SYSTEM$STREAM_HAS_DATA - If there is no data in the stream, the task won’t be executed, which minimizes cost as there’s no charge.

  3. Flexible Scheduling: Although these tasks will be executed every five minutes, we have the full power of CRON scheduling, which provides incredible flexibility in when the tasks are executed.

Can Tasks be Dependent on Others?

Absolutely. This is both a powerful feature and a potential risk of complexity. The diagram below illustrates how, although the “root” task must be scheduled, subsequent tasks can depend upon prior tasks and the successful completion of multiple tasks.

The image below shows the code needed and the implied dependencies, indicating how to create a potentially complex dependency tree or DAG.

You should, however, be cautious about creating complex dependencies between tasks. It’s easy to make a spider's web of dependencies that quickly become difficult to manage and debug.

When is the Stream Reset?

If you’re watching closely, you’ll see I’ve explained how to query a stream, but not when the pointer is moved forward to avoid processing the same data twice.

As new rows are added, the stream pointer (which uses the same versioning mechanism used by Time Travel) stays static, which means querying the stream includes the additional rows. So what happens if we execute the following SQL?

insert into processed_customers
select *
from   customer_stream;

Any Data Manipulation Language (DML) operation, including Insert, Update, Delete, or Merge that references a Stream moves the pointer forward and marks the rows as processed.

This is illustrated in the diagram below:

The diagram above illustrates that when the INSERT statement is committed, it automatically moves the Stream Pointer to the end of the micro-partitions processed, which means the stream won’t return them.

This also explains why you may need multiple streams on the same table if you want to independently process the same data multiple times to track changes and write to separate tables.

Alternatives to Snowflake Streams?

The best alternative to Streams is Snowflake Dynamic Tables, but for an even more flexible alternative to Streams, consider using the CHANGES clause within a SQL statement.

Using this method, you don’t need to create a Stream to track changes, and you have the flexibility to query and re-query by timestamp. To enable change tracking on a table, execute the following:

-- Enable change tracking on the table. 
alter table sales 
   set change_tracking = true;

The above statement adds a couple of hidden bytes to the table, which allows Snowflake to use the CHANGES clause as follows:

select * 
from sales
   changes(information => append_only)
   at(timestamp => <TIMESTAMP>);

The above command returns only the changes on or after the given TIMESTAMP. In this case, because we used the APPEND_ONLY option, it returns only the newly added rows, but we could leave this off to identify inserts, updates, or deletes.

While this is even more flexible than Streams, you must maintain the timestamps.

Should You Use Streams?

Streams are a remarkably flexible way to identify data changes in Snowflake tables. You can identify new rows (inserts), changes (merge or updates), and removals (deletes and truncates) and apply the changes downstream.

When combined with tasks, they are remarkably simple and efficient ways to deliver near real-time pipelines, although they are equally excellent ways to deliver micro-batch or batch operations.

Tasks (although independent of streams) can be executed using a flexible CRON schedule and either using a virtual warehouse or serverless, making streams even more powerful.

My analysis when working with Fortune 500 customers spending upwards of $1m per year on Snowflake virtual warehouses indicates that on some virtual warehouses with frequent short batch transformations, they wasted between 25% and 60% of credits due to idle time. This results from frequently repeated queries, and even setting the AUTO_SUSPEND time to 60 seconds didn’t eliminate the problem.

Effectively, each time a process is completed on a Virtual Warehouse, there’s a period of idle time after which the warehouse is suspended. This idle time wastage is more prevalent during off-peak hours when few queries are executed. Frequently repeated jobs (e.g., every 2 minutes) exacerbate the situation, and each time the warehouse size is increased, the cost of the idle time is doubled.

Executing repeating, short tasks as Serverless Tasks is a remarkably efficient way to deploy frequently repeating transformation processes.

The alternative (and I think it's better) is to use Dynamic Tables. Although these don’t have quite the same flexibility to identify change types, they have distinct advantages.

Finally, for complete flexibility, consider using the CHANGES The clause described above has the advantage of re-processing data, unlike streams that only move the pointer forward.

Streams and Tasks Use Cases

The best use cases I’ve seen for using Streams and Tasks include:

  • Real-Time Ingestion: The diagram below illustrates a typical scenario whereby data is ingested using Snowpipe or Snowpipe Streaming, automatically appending data to a staging table. Deploying a stream on the staging table means it’s easy to identify the incremental changes and avoids reprocessing history.

  • Ingestion from External Tables: The SQL code below illustrates the statement needed to create a stream on an External Table. Queries against External tables are swift, but data must sometimes be loaded into Snowflake. Creating a stream over an External Table returns only the newly added data, which can then be fetched and loaded into tables while avoiding duplicate rows.
create stream new_sales_stream
  on external table sales
  insert_only = TRUE;
  • Building Slowly Changing Dimensions: Because Streams can be used to track Inserts, Updates, or Deletes, including the before-and-after image of updates, it’s easy to maintain SCD2 tables from source changes, which is an everyday use.

Streams and Tasks: Best Practices

  • Streams and Tasks are Independent: Although these two work well together, you must know they are independent. You can use (for example), Apache Airflow to replace Tasks or schedule regular processing using Tasks independent of Streams.

  • Use Append-only Streams: In many cases (for example, processing newly loaded data), you only need to identify the newly inserted rows. In these cases, use the APPEND-ONLY parameter, meaning the stream ignores UPDATE or DELETE operations. An Append-only stream has significant performance advantages over the default, making it a compelling, simple way of maximizing ETL performance.

  • Avoid Complex Task Dependencies: My experience has demonstrated how complex ELT systems can become over time. It would be best to avoid overly complex dependencies, which can lead to an unmanageable spider web. While you often cannot avoid dependencies (for example, Dimension and Fact table changes), you should consider whether interconnected pipelines can be simplified.

  • Use the Snowsight Task Graph to show the status of each task and illustrate the dependencies visually. Keep a close eye on complex DAGs, or you’ll risk your ELT pipeline dependencies becoming unmanageable, as a single failure early in the process can impact the entire downstream process.

  • Minimize Cost using Serverless Tasks: While you can use an existing Virtual Warehouse, deploying a Serverless Task uses Snowflake compute resources and benefits from a 10% discount. Using USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE you can indicate an initial warehouse size, and Snowflake will track usage and adjust the warehouse size automatically over time to get the best balance of throughput, performance, and cost.

  • Integrate Tasks with Existing Orchestration: Using Snowflake Tasks is not an all-or-nothing approach. If you’re already using, for example, Apache Airflow, you can still gain the benefits of Serverless Tasks by using the Execute Task command. This means you can combine the benefits of an existing DAG with the cost-effectiveness benefits of Serverless Tasks.

  • Carefully choose Warehouse Size: When executing repeating batch processes on a named warehouse, carefully consider the Snowflake recommended best practices for warehouse considerations to define the correct warehouse size and configuration. In my experience, any batch transformation job that runs for less than a minute is more cost-effective when executed as a Serverless Task.

  • Consider alternatives to Streams: Remember, “If the only tool you have is a hammer, you tend to see every problem as a nail.” Be aware of the abovementioned alternative solutions, most notably Snowflake Dynamic Tables and the CHANGES clause. Dynamic Tables, in particular, have significant advantages in terms of simplicity, which are worth consideration.

  • Avoid full history re-processing: Streams and Dynamic Tables are designed to simplify identifying data changes, meaning transformation tasks can work on incremental data changes rather than re-processing the entire history. While applying incremental changes may be slightly more effort, this often leads to a massive cost reduction compared to complete data processing each time. Be aware that Snowflake is entirely pay-for-use, and as data volumes inevitably grow, you’ll increase the time taken to process the data and pay for the additional computing. I’ve worked with customers who reduced costs by over $100,000 by converting re-processing to incremental ETL.

Conclusion

Snowflake Streams and Tasks are indispensable tools for modern data engineering. With Streams, you can track every data change with precision, and with Tasks, you can seamlessly automate complex workflows.

Following this guide's examples and best practices, you can build scalable, real-time data pipelines that power everything from analytics to customer notifications. Whether you’re processing millions of transactions or building simple ETL workflows, Streams and Tasks give you the flexibility and control you need.

Do you use Slack at work? Can you Help Me?

Over the past five years, my Snowflake articles have helped over 250,000 IT professionals understand this fantastic technology. Show off your Snowflake knowledge to your colleagues and help me reach a million by sharing the link below on your company Slack channel.

See this great article on Snowflake Streams and Tasks by John Ryan at Analytics Today: https://Articles.Analytics.Today/snowflake-streams-and-tasks-best-practices

Thanks again for reading this, and look out for more great articles at Analytics.Today


Curious how Snowflake is used by the top Fortune 500?

Want to know…

  1. How we loaded 350TBs from 500,000+ tables costing just $576 in just 3 days.

  2. How one customer saved $100,000 a year on storage with a simple change.

  3. How customers get queries 741 times faster, from 21 minutes to 1.7 seconds.

  4. How Snowflake caches data - and why it (mostly) doesn't matter.

Accelerate your Snowflake career with Snowflake Training by Analytics Today. Prepare yourself for fantastic opportunities ahead. Click the image below to take the first step.

0
Subscribe to my newsletter

Read articles from John Ryan directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

John Ryan
John Ryan

After 30 years of experience building multi-terabyte data warehouse systems, I spent five years at Snowflake as a Senior Solution Architect, helping customers across Europe and the Middle East deliver lightning-fast insights from their data. In 2023, he joined Altimate.AI, which uses generative artificial intelligence to provide Snowflake performance and cost optimization insights and maximize customer return on investment. Certifications include Snowflake Data Superhero, Snowflake Subject Matter Expert, SnowPro Core, and SnowPro Advanced Architect.