How to develop realtime ETL Pipeline using Streams and Tasks on snowflake?


In modern data ecosystems, real-time ETL (Extract, Transform, Load) pipelines are crucial for ensuring data freshness and supporting timely decision-making. Snowflake, with its powerful features like Streams and Tasks, provides a straightforward way to implement real-time or near-real-time ETL pipelines. This article will guide you through the steps to set up such a pipeline, leveraging these features effectively.
What Are Streams and Tasks in Snowflake?
Streams: Streams in Snowflake track changes (inserts, updates, deletes) to a table and store them as a delta, enabling you to process only the modified data.
Tasks: Tasks allow you to schedule and automate the execution of SQL statements, such as stored procedures, to perform transformations.
By combining Streams and Tasks, you can create a pipeline that processes data incrementally and automatically triggers downstream actions, ensuring near-real-time updates.
Step-by-Step Guide
Step 1: Set Up Source and Target Tables
Create a source table to simulate incoming data and a target table where the transformed data will be stored.
-- Create the source table
CREATE OR REPLACE TABLE source_table (
id INT,
name STRING,
created_at TIMESTAMP
);
-- Create the target table
CREATE OR REPLACE TABLE target_table (
id INT,
name STRING,
formatted_date STRING
);
Step 2: Create a Stream on the Source Table
A stream captures the changes in the source table. We'll create a stream to monitor the source_table.
CREATE OR REPLACE STREAM source_table_stream ON TABLE source_table;
Step 3: Create a Stored Procedure for Transformation
The stored procedure will process the data captured by the stream and insert transformed data into the target table. In this example, we will transform the created_at timestamp into a formatted date string.
CREATE OR REPLACE PROCEDURE process_streamed_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
-- Insert transformed data from the stream into the target table
INSERT INTO target_table (id, name, formatted_date)
SELECT id, name, TO_CHAR(created_at, 'YYYY-MM-DD HH24:MI:SS')
FROM source_table_stream
WHERE METADATA$ACTION = 'INSERT';
RETURN 'Data processed';
END;
$$;
Step 4: Create a Task to Automate the Pipeline
The task checks if the stream has data using the HAS_DATA() function and triggers the stored procedure.
CREATE OR REPLACE TASK process_data_task
WAREHOUSE = my_warehouse
SCHEDULE = '1 MINUTE'
AS
BEGIN
-- Check if stream has data
IF SYSTEM$STREAM_HAS_DATA('source_table_stream') THEN
CALL process_streamed_data();
END IF;
END;
Step 5: Enable the Task
Activate the task to automate the pipeline.
ALTER TASK process_data_task RESUME;
Testing the Pipeline
Insert data into the source_table:
INSERT INTO source_table VALUES (1, 'Alice', CURRENT_TIMESTAMP), (2, 'Bob', CURRENT_TIMESTAMP);
Check the target_table after a minute to see the transformed data:
SELECT * FROM target_table;
Conclusion
With Snowflake's Streams and Tasks, you can build efficient real-time ETL pipelines that are both simple to implement and highly scalable. By processing only incremental data and automating transformations, you can ensure timely and accurate data updates for your business needs.
You can also leverage Snowflake's new Dynamic Table feature, which simplifies implementation by combining the functionalities of streams and tasks. However, it still presents some challenges, particularly with performance optimization and the use of non-deterministic functions.
Feel free to experiment with this setup and tailor it to your specific use case. Happy coding!
Subscribe to my newsletter
Read articles from Omkar Mozar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Omkar Mozar
Omkar Mozar
As a versatile and innovative software engineer with extensive experience in developing scalable systems and modernizing legacy applications, I thrive on crafting impactful solutions. At Acquia, I designed real-time data pipelines using AWS, GCP, and Snowflake, reducing processing times by over 50%, driving efficiency, and cutting operational costs. My expertise spans frameworks like Java Spring Boot, Hibernate, and AngularJS, and databases such as PostgreSQL, MongoDB, and Snowflake. Passionate about collaboration and code quality, I actively lead performance optimizations and contribute to knowledge-sharing sessions. Skilled in Agile practices, I ensure seamless project delivery by aligning technical strategies with business goals. I excel in migrating to cloud platforms like AWS and GCP and modernizing infrastructures with cutting-edge technologies. Whether innovating for telecommunication products or revolutionizing data workflows, I’m driven by the desire to deliver high-quality, scalable, and efficient solutions that solve complex problems and create lasting impact.