Azure Python Time Trigger Functions: Manage File Transfer - A Comprehensive Guide


Inception
Hello everyone, this article is part of The Azure Series and builds upon Azure Python HTTP Functions: A Comprehensive Guide article. I use this series to publish-out Azure Projects & Knowledge.
Overview
In the previous Blog post, we discovered The HTTP Trigger, which fires the function once the function URL is accessed. We also noticed that there’s a list of triggers that Azure provides as a gate for our function (e.g. HTTP, Blob trigger, Time trigger, etc.)
The Time trigger is a schedule-based trigger that will fire the function periodically based on a determined time. Indeed, it’s based on the corn schedule. Hence, The Cron schedule-based is capable enough to manage the periodic function schedule time with simplicity.
Today’s Blog post will walk through a comprehensive guide implementation of Manage file transfer between Blob Containers by using scheduled Time Trigger function.
Overview - Architecture diagram components
The “eraki_eastus_timetrfunc_1001” Function will configure to be triggered every 5 minutes, Listing & copying the object files of The “source” container to The “destination” container, authenticated by Managed Identiy. This is the high overview of the solution.
Indeed, the function structure will be more complex, ensuring scalability and resiliency, as below structure:
The function will consider separation of code concerns across multiple functions and files; which ensures a better understanding for developers for future enhancements.
A separated function do one thing only and do it well.
Consider robust function documentation by using Google docstring and typehint.
Consider PEP8 Code standardization while coding.
The Time trigger function file structure will be as follows
💡The functions should be listed underutils
directory. However, Azure Function App will not consider upload directories, only files. Therefore, all Function App Python files will be listed at the root directory.💡Transferring files from one container to another will require a list of Source path URLs and destinations. To avoid hardcoding such paths, we gonna use a CSV catalog file includes the source and destination paths, Then loop through these paths.💡If your mind starts struggling and has complex concerns, Don’t worry will explain each in a moment; and remember, building a scalable infrastructure of a city has a lot of concerns as well.
├── cp_blob.py # A py file contains functions related to the copy process.
├── csv_reader.py # A py file contains functions related to the CSV file reading.
├── data_catalog.csv # A CSV file contains sources and destination paths.
├── function_app.py # The entry point - responsible for calling and handling functions depends.
├── host.json # Auto-generated - host setting file
├── local.settings.json # Auto-generated - local setting file
└── requirements.txt # Auto-generated - list all the required SDKs.
├── azure-pipelines.yml # Optionl - in case integrated with Azure DevOps pipelines.
├── README.md
The Architecture includes multiple components that require to be deployed first, as we deployed some of them in the previous Blog post (e.g. Function App and its storage account) we are not going to repeat that again. The missing components here are:
- Managed identity with “Storage Blob Data Contributor” role.
Append this role to the RBAC of the Storage account and for the Function app under Identity/User assigned.
- Create Two Blob Containers with source and destination names.
Solution Implementation Steps
The implementation steps include steps that we made in the previous article. First and foremost, consider Environment Preparation.
Let’s start-out the implementation with the following steps
Create a local time trigger function
Create a directory for our project and start initiating.
Set the function name as “eraki_time_trigger_func_demo“
Leave the schedule as is.
mkdir az_time_trigger_func_demo; cd az_time_trigger_func_demo
func new
Select a number for worker runtime:
1. dotnet (isolated worker model)
2. dotnet (in-process model)
3. node
4. python
5. powershell
6. custom
Choose option: 4
python
Found Python version 3.12.3 (python3).
The new Python programming model is generally available. Learn more at https://aka.ms/pythonprogrammingmodel
Writing requirements.txt
Writing function_app.py
Writing .gitignore
Writing host.json
Writing local.settings.json
Writing /home/user/az_time_trigger_func_demo/.vscode/extensions.json
Select a number for template:
1. Blob trigger
2. CosmosDB trigger
3. Dapr Publish Output Binding
4. Dapr Service Invocation Trigger
5. Dapr Topic Trigger
6. Blob trigger (using EventGrid source)
7. EventGrid trigger
8. EventHub trigger
9. HTTP trigger
10. Queue trigger
11. ServiceBus Queue trigger
12. ServiceBus Topic trigger
13. Timer Trigger
Choose option: 13
Function Name: [timer_trigger] eraki_time_trigger_func_demo
Schedule: [0 */5 * * * *]
Appending to /home/user/az_time_trigger_func_demo/function_app.py
The function "eraki_time_trigger_func_demo" was created successfully from the "Timer Trigger" template.
- Open in VS Code.
code .
Create & Update Function files
- Create a CSV Catalog file and place the sources and destination paths for dummy directories.
# data_catalog.csv file content
full_source_path,full_dest_path
"https://STORAGE_ACCOUNT_NAME.blob.core.windows.net/source/sourcedir01/$BLOB_NAME","https://STORAGE_ACCOUNT_NAME.blob.core.windows.net/destination/destdir01/$BLOB_NAME"
"https://STORAGE_ACCOUNT_NAME.blob.core.windows.net/source/sourcedir2/$BLOB_NAME","https://STORAGE_ACCOUNT_NAME.blob.core.windows.net/destination/destdir02/$BLOB_NAME"
"https://STORAGE_ACCOUNT_NAME.blob.core.windows.net/source/sourcedir3/$BLOB_NAME","https://STORAGE_ACCOUNT_NAME.blob.core.windows.net/destination/destdir03/$BLOB_NAME"
The looping will loop on each row and copy the files from the source to the destination row by row.
- Create a
csv_reader.py
file that contains functions related to reading and handling the CSV data catalog.
# csv_reader.py file content
"""Module to read and handel CSV file and return its values."""
import csv
import os
import logging
def csv_reader() -> list[dict]:
"""csv reader function.
Reads a CSV file containing source and destination paths.
The function reads data from a file named 'data_catalog.csv' located in a parent
directory relative to the current file. It parses each row into a dictionary:
'src_path', 'dest_path'
Returns:
list[dict]: A list of dictionaries, each containing:
- src_path (str): Full source path.
- dest_path (str): Full destination path.
Raises:
FileNotFoundError: If the CSV file is not found.
csv.Error: If there is an error parsing the CSV file.
Exception: For any other unexpected errors.
"""
csv_path = os.path.join(os.path.dirname(__file__), "data_catalog.csv")
paths_list = []
try:
with open(csv_path, "r", newline="", encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
logging.info("Start reading CSV file %s", csv_path)
for row in reader:
assert "full_source_path" in row and isinstance(
row["full_source_path"], str
), "full_source_path not found or not a string in row: {}".format(row)
assert "full_dest_path" in row and isinstance(
row["full_dest_path"], str
), "full_dest_path not found or not a string in row: {}".format(row)
paths_list.append(
{
"src_path": row["full_source_path"],
"dest_path": row["full_dest_path"],
}
)
logging.info("Result of reading/looping CSV file: %s", paths_list)
except FileNotFoundError:
logging.error("File not found: %s", csv_path)
except csv.Error as e:
logging.error("Error reading CSV file: %s", e)
assert paths_list is not None
return paths_list
def csv_src_path_handler(paths_list, blob_names):
"""csv_src_path_handler function.
Replaces $BLOB_NAME in source paths with a blob name.
This function processes a list of source path entries replaces any asterisk (`$BLOB_NAME`)
in the source path with the given `blob_name`, and returns the fully resolved source paths.
Args:
paths_list (list[dict]): A list of dictionaries, each containing:
- 'src_path' (str): The source path, potentially with ($BLOB_NAME).
blob_names (list[str]): A list of blob names to replace the $BLOB_NAME in the source paths.
Returns:
list[str]: A list of updated source paths with $BLOB_NAME replaced.
"""
logging.info(
"Starting csv_src_path_handler with paths_list: %s and blob_names: %s",
paths_list,
blob_names,
)
updated_src_paths = []
for blob_name in blob_names:
for path in paths_list:
src_path = path["src_path"]
assert "$BLOB_NAME" in src_path, f"$BLOB_NAME not found in {src_path}"
if "$BLOB_NAME" in src_path:
src_path = src_path.replace("$BLOB_NAME", blob_name)
updated_src_paths.append(src_path)
continue
logging.info("Updated source paths: %s", updated_src_paths)
assert updated_src_paths is not None
return updated_src_paths
def csv_dest_path_handler(paths_list, blob_names):
"""csv_des_path_handler function.
Replaces $BLOB_NAME in destination paths with a blob name.
This function processes a list of destination path entries, replaces the
`$BLOB_NAME` placeholder in each destination path with the provided `blob_name`,
and returns the updated destination paths.
Args:
paths_list (list[dict]): A list of dictionaries, each containing:
- 'dest_path' (str): The destination path, potentially containing `$BLOB_NAME`.
blob_name blob_names (list[str]): A list of blob names to replace the $BLOB_NAME in
the source paths.
Returns:
list[str]: A list of updated destination paths with $BLOB_NAME replaced.
"""
logging.info(
"Starting csv_dest_path_handler with paths_list: %s and blob_names: %s",
paths_list,
blob_names,
)
updated_dest_paths = []
for blob_name in blob_names:
for path in paths_list:
dest_path = path["dest_path"]
assert "$BLOB_NAME" in dest_path, f"$BLOB_NAME not found in {dest_path}"
if "$BLOB_NAME" in dest_path:
dest_path = dest_path.replace("$BLOB_NAME", blob_name)
updated_dest_paths.append(dest_path)
continue
logging.info("Updated Destination paths: %s", updated_dest_paths)
assert updated_dest_paths is not None
return updated_dest_paths
- Create a
cp_blob.py
file contains functions related to the copy process
"""Module to manage file transfer to/from Azure Blob Storage."""
import logging
from urllib.parse import urlparse
from azure.identity import ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
def list_src_blob() -> list[str]:
"""list_src_blob function.
Lists blobs in the source Azure Blob Storage container.
Returns:
list[str]: A list of blob names found in the source container.
"""
blob_names = []
storage_account_url = "https://STORAGE_ACCOUNT_NAME.blob.core.windows.net"
container_name = "source"
client_id = "REPLACE_WITH_MANAGED_IDENTITY_CLIET_ID_CREATED"
credential = ManagedIdentityCredential(client_id=client_id)
# Connect to the Blob Service Client using Managed Identity
blob_service_client = BlobServiceClient(
account_url=storage_account_url, credential=credential
)
logging.info("Getting container client for %s", container_name)
container_client = blob_service_client.get_container_client(container_name)
logging.info("Container client created successfully.%s", container_client)
logging.info(" Listing blobs in container: %s", container_name)
for blob in container_client.list_blobs():
logging.info("Found blob: %s", blob.name)
blob_name = blob.name
blob_name = blob_name.split("/", -1)
if len(blob_name) > 1:
blob_names.append(blob_name[-1])
logging.info("Blob names extracted: %s", blob_names)
assert blob_names is not None
return blob_names
def cp_blob(updated_src_paths, updated_dest_paths):
"""cp_blob function.
Copies blobs from source to destination using Azure Blob Storage URLs.
This function takes lists of fully-formed source and destination blob URLs
creates `BlobClient` instances, and initiates asynchronous blob copy
operations from source to destination.
Args:
updated_src_paths (list[str]): List of complete source blob URLs.
updated_dest_paths (list[str]): List of complete destination blob URLs.
Returns:
None
"""
client_id = "REPLACE_WITH_MANAGED_IDENTITY_CLIET_ID_CREATED"
credential = ManagedIdentityCredential(client_id=client_id)
blob_service_client = BlobServiceClient(
account_url="https://STORAGE_ACCOUNT_NAME.blob.core.windows.net",
credential=credential,
)
logging.info("Start looping: updated_src_paths and updated_dest_paths")
for src_path, dest_path in zip(updated_src_paths, updated_dest_paths):
try:
logging.info(
"Source Path: %s, Destination Path: %s.",
src_path,
dest_path,
)
src_blob_info = urlparse(src_path)
logging.info("setting src_blob_info: %s", src_blob_info)
dest_blob_info = urlparse(dest_path)
logging.info("setting dest_blob_info: %s", dest_blob_info)
src_container = src_blob_info.path.split("/")[1]
logging.info("fetching src_container: %s", src_container)
src_blob_name = "/".join(src_blob_info.path.split("/")[2:])
logging.info("fetching src_blob_name: %s", src_blob_name)
dest_container = dest_blob_info.path.split("/")[1]
logging.info("fetching dest_container: %s", dest_container)
dest_blob_name = "/".join(dest_blob_info.path.split("/")[2:])
logging.info("fetching dest_blob_name: %s", dest_blob_name)
source_blob_client = blob_service_client.get_blob_client(
container=src_container, blob=src_blob_name
)
logging.info("build blob client for source: %s", source_blob_client)
dest_blob_client = blob_service_client.get_blob_client(
container=dest_container, blob=dest_blob_name
)
logging.info("build blob client for destination: %s", dest_blob_client)
source_blob_url = source_blob_client.url
if source_blob_client.exists():
copy_props = dest_blob_client.start_copy_from_url(source_blob_url)
logging.info("Copy started. Copy ID: %s.", copy_props["copy_id"])
else:
logging.warning("Source blob does not exist: %s", src_blob_name)
except Exception as e:
logging.error("An error occurred while copying the blob: %s", e)
- Update The
function_app.py
file as follows:
"""cp Blob - File transfer Integration Project
The is the main entry point for cp Blob Azure function for the file transfer project.
It is triggered by Time Schedule event. Then, Starts manage file transfer based on
Data Catalog CSV file composed of the source and destination paths.
The Function built on the Azure Functions Python SDK. The Function source-code following
Python PEP standards. it's code is well-documented, and function based with clear separation
of concerns, check out the called functions at the same working dir level i.e. couldn't use
utils directory for listing function as Azure function don't upload dirs.
the function is designed to be modular and reusable, allowing for easy integration and
future enhancements with other components of the fire transfer project.
For future enhancements, Please remember "a function do one thing only and do it well".
Examples:
This example provides how end users can use the function to manage file transefer to/from blobs.
for more info check the cp Blob file transfer project documentation.
Copy a blob from source to destination:
- Open Azure portal, Navigate to the Storage account to which source directory you wanna copy from.
- On the right three dots, copy the URL of the source directory, Past it at data_catalog.csv file
under the full_source_path column into dobule quotes.
- Do the same for the Destination directory.
- Save.
"""
import logging
import azure.functions as func
from cp_blob import cp_blob, list_src_blob
from csv_reader import csv_dest_path_handler, csv_reader, csv_src_path_handler
app = func.FunctionApp()
@app.schedule(
schedule="0 */5 * * * *]", arg_name="myTimer", run_on_startup=True, use_monitor=False
)
def eraki_time_trigger_func_demo(myTimer: func.TimerRequest) -> func.HttpResponse:
"""eraki_time_trigger_func_demo function.
This is the main entry point for cp Blob Azure function for the file transfer project.
It is triggered Time Schedule event. Then, Starts manage file transfer based on
Data Catalog CSV file composed of the source and destination paths.
Args:
schedule (cron time schedule): cron time schedule for the function to run.
Returns:
func.HttpResponse: HTTP response indicating success or failure of the operation.
"""
try:
logging.info("Python time trigger function processed a request.")
blob_names = list_src_blob()
paths_list = csv_reader()
updated_src_paths = csv_src_path_handler(paths_list, blob_names)
# Call teh CSV destination paths
updated_dest_paths = csv_dest_path_handler(paths_list, blob_names)
# Call the cp_blob function to copy the blob
cp_blob(updated_src_paths, updated_dest_paths)
return func.HttpResponse("Blob copied successfully", status_code=200)
except Exception as e:
return func.HttpResponse(f"Error: {str(e)}", status_code=500)
- Update the
requirements.txt
file as follows
azure-functions
azure-storage-blob
Deploy Time Trigger function to Azure Function App
As we went through such steps in the previous article, we won’t repeat such steps here; follow the link below to deploy: deploy-the-http-function-to-azure-function-app
Or, you can even deploy by using Azure CLI command, first ensure that you’re logged in by az login
- Zip your code
zip -r eraki_time_trigger_func_demo.zip .
# list zipped files
unzip -l eraki_time_trigger_func_demo.zip
- Deploy the zipped file
az functionapp deployment source config-zip \
--resource-group <YourResourceGroup> \
--name erakitimetriggerfuncdemo\
--src eraki_time_trigger_func_demo.zip
func azure functionapp publish DataConnectornortheurtest1001 --publish-local-settings -i
- Fetch deployed function settings
func azure functionapp fetch-app-settings <FunctionAppName> --resource-group <ResourceGroupName>
Resources
That's it, Very straightforward, very fast🚀. Hope this article inspired you and will appreciate your feedback. Thank you
Subscribe to my newsletter
Read articles from Mohamed El Eraki directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Mohamed El Eraki
Mohamed El Eraki
Cloud & DevOps Engineer, Linux & Windows SysAdmin, PowerShell, Bash, Python Scriptwriter, Passionate about DevOps, Autonomous, and Self-Improvement, being DevOps Expert is my Aim.