Using Background Tasks with Celery in Flask

Flask IndiaFlask India
11 min read

Introduction:

Today's digital world runs on RESTful API integrations and all programming languages have some library or framework to help us developers create the same. Python similarly relies on frameworks such as Flask, Django, and FastAPI, etc for assisting developers with RESTful API development.

Using our earlier tutorials here, you can quickly get up and running by creating a Flask API.

  1. Create your first Flask App.

  2. Run your Flask App.

  3. Validating your Flask RESTful API.

  4. Logging your Flask App.

Why do we need background tasks?

Although the purpose of RESTful APIs is to facilitate the easy movement of data between two systems, there are also use cases where it is used for activities that require a bit of processing time since it is dependent on some external factors like I/O, network bandwidth, raw compute, etc.

At times like these, it is better to offload the time-consuming activity to be processed in the background, also known as asynchronous processing.

This is typically done by offloading the request to a background queue where a worker then processes the same. The background queue is also known as a task queue.

What are some popular Task Queueing Libraries?

We have a few of them such as Celery, Dramatiq, and Huey.

Documentation links for the same are as below:

  1. Celery

  2. Dramatiq.

  3. Huey

For the scope of this article, we explore a popular option in the Python world, Celery.

What is Celery?

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Above is the introduction of Celery as per the official documentation. It is one of the oldest Python libraries that helps with async processing.

Let’s get started!

We create a simple Flask App and a boilerplate for the Celery consumer. The idea here is that the Flask App acts as the producer, generating the metadata for the task to be processed asynchronously in the background. The consumer accepts this and processes it accordingly. This keeps our API free to take further requests without being blocked.

We also need a Message Broker to store our messages to be processed asynchronously i.e. in the background. There are a number of message brokers such as RabbitMQ, Redis, Apache Kafka, etc. For our example, we will be using the RabbitMQ message broker.

We have set up our broker using Docker containers. We have used RabbitMQ for our use case.

Docker is a software platform that allows you to build, test, and deploy applications quickly. Docker packages software into standardized units called containers that have everything the software needs to run including libraries, system tools, code, and runtime.

For our demo, we will be downloading images in the background from any valid URL. Our worker will do the background processing while our Flask API is free to accept further requests.

Let’s create our Python file as below.

# Import Modules:

import os
import uuid
import traceback
from flask import Flask, jsonify, request
from celery import Celery
import requests
from marshmallow import Schema, fields, ValidationError

app = Flask(__name__)

# Configure Celery to use RabbitMQ:

app.config['CELERY_BROKER_URL'] = 'amqp://mqadmin:mqadminpass@localhost/mq_admin_vhost'

celery_app = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery_app.conf.update(app.config)

# Marshmallow API validation:

class UrlSchema(Schema):
    url = fields.Url(required=True)

headers = {'Content-Type': 'application/json'}

@celery_app.task
def download_image_from_url(url, file):
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise HTTP errors, if any

        # print(f'response type: {response.headers}')

        save_path = os.path.dirname(os.path.abspath(__file__))
        directory = f'{save_path}/images'

        if os.path.isdir(directory) == False:
            os.makedirs(directory, exist_ok=True)

        content_type = response.headers.get('Content-Type')
        if content_type in ['image/jpeg', 'image/jpg']:
            file_name = f'{directory}/{file}.jpg'
        elif content_type == 'image/png':
            file_name = f'{directory}/{file}.png'

        with open(file_name, 'wb') as file:
            for chunk in response.iter_content(1024):
                file.write(chunk)
        return f"Image downloaded successfully: {directory}"

    except Exception as e:
        traceback.print_exc()

@app.route('/add-task', methods=['POST'])
def add_task():
    try:
        data = request.get_json()
        url = data.get('url')

        # Call the task asynchronously:
        file = str(uuid.uuid1())
        task = download_image_from_url.apply_async(
            args=[url, file]
        )
        response = {'task_id': task.id, 'status': 'Task submitted'}
        return jsonify(response), 202, headers

    except ValidationError as e:
        return e.messages, 400, headers

    except Exception as e:
        traceback.print_exc()
        response = {
            'message': 'unable to process request',
            'reason': 'internal server error'
        }
        return jsonify(response), 500, headers

if __name__ == '__main__':
    # print(os.path.dirname(os.path.abspath(__file__)))
    app.run(host='0.0.0.0', port=8000, debug=True)

Note: We use stream=True to download our media file in chunks for efficient file downloading.

Running our Flask App and Celery App

Let’s create and activate our virtual environment as below.

➜  flask_background_tasks python3 -m venv venv
➜  flask_background_tasks . ./venv/bin/activate
(venv) ➜  flask_background_tasks

Let’s install our project requirements and save the same to our requirements.txt file

(venv) ➜  pip install flask celery requests marshmallow
Collecting flask
  Using cached flask-3.1.0-py3-none-any.whl (102 kB)
Collecting celery
  Using cached celery-5.4.0-py3-none-any.whl (425 kB)
Collecting requests
  Using cached requests-2.32.3-py3-none-any.whl (64 kB)
Collecting marshmallow
  Using cached marshmallow-3.23.1-py3-none-any.whl (49 kB)
Collecting Werkzeug>=3.1
  Using cached werkzeug-3.1.3-py3-none-any.whl (224 kB)
Collecting itsdangerous>=2.2
  Using cached itsdangerous-2.2.0-py3-none-any.whl (16 kB)
Collecting blinker>=1.9
  Using cached blinker-1.9.0-py3-none-any.whl (8.5 kB)
Collecting click>=8.1.3
  Using cached click-8.1.7-py3-none-any.whl (97 kB)
Collecting Jinja2>=3.1.2
  Using cached jinja2-3.1.4-py3-none-any.whl (133 kB)
Collecting importlib-metadata>=3.6
  Using cached importlib_metadata-8.5.0-py3-none-any.whl (26 kB)
Collecting tzdata>=2022.7
  Using cached tzdata-2024.2-py2.py3-none-any.whl (346 kB)
Collecting vine<6.0,>=5.1.0
  Using cached vine-5.1.0-py3-none-any.whl (9.6 kB)
Collecting click-repl>=0.2.0
  Using cached click_repl-0.3.0-py3-none-any.whl (10 kB)
Collecting python-dateutil>=2.8.2
  Using cached python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Collecting click-plugins>=1.1.1
  Using cached click_plugins-1.1.1-py2.py3-none-any.whl (7.5 kB)
Collecting click-didyoumean>=0.3.0
  Using cached click_didyoumean-0.3.1-py3-none-any.whl (3.6 kB)
Collecting kombu<6.0,>=5.3.4
  Using cached kombu-5.4.2-py3-none-any.whl (201 kB)
Collecting billiard<5.0,>=4.2.0
  Using cached billiard-4.2.1-py3-none-any.whl (86 kB)
Collecting idna<4,>=2.5
  Using cached idna-3.10-py3-none-any.whl (70 kB)
Collecting charset-normalizer<4,>=2
  Using cached charset_normalizer-3.4.0-cp39-cp39-macosx_11_0_arm64.whl (120 kB)
Collecting urllib3<3,>=1.21.1
  Using cached urllib3-2.2.3-py3-none-any.whl (126 kB)
Collecting certifi>=2017.4.17
  Using cached certifi-2024.8.30-py3-none-any.whl (167 kB)
Collecting packaging>=17.0
  Using cached packaging-24.2-py3-none-any.whl (65 kB)
Collecting prompt-toolkit>=3.0.36
  Using cached prompt_toolkit-3.0.48-py3-none-any.whl (386 kB)
Collecting zipp>=3.20
  Using cached zipp-3.21.0-py3-none-any.whl (9.6 kB)
Collecting MarkupSafe>=2.0
  Using cached MarkupSafe-3.0.2-cp39-cp39-macosx_11_0_arm64.whl (12 kB)
Collecting amqp<6.0.0,>=5.1.1
  Using cached amqp-5.3.1-py3-none-any.whl (50 kB)
Collecting typing-extensions==4.12.2
  Using cached typing_extensions-4.12.2-py3-none-any.whl (37 kB)
Collecting wcwidth
  Using cached wcwidth-0.2.13-py2.py3-none-any.whl (34 kB)
Collecting six>=1.5
  Downloading six-1.17.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: wcwidth, vine, zipp, tzdata, typing-extensions, six, prompt-toolkit, MarkupSafe, click, amqp, Werkzeug, urllib3, python-dateutil, packaging, kombu, Jinja2, itsdangerous, importlib-metadata, idna, click-repl, click-plugins, click-didyoumean, charset-normalizer, certifi, blinker, billiard, requests, marshmallow, flask, celery
Successfully installed Jinja2-3.1.4 MarkupSafe-3.0.2 Werkzeug-3.1.3 amqp-5.3.1 billiard-4.2.1 blinker-1.9.0 celery-5.4.0 certifi-2024.8.30 charset-normalizer-3.4.0 click-8.1.7 click-didyoumean-0.3.1 click-plugins-1.1.1 click-repl-0.3.0 flask-3.1.0 idna-3.10 importlib-metadata-8.5.0 itsdangerous-2.2.0 kombu-5.4.2 marshmallow-3.23.1 packaging-24.2 prompt-toolkit-3.0.48 python-dateutil-2.9.0.post0 requests-2.32.3 six-1.17.0 typing-extensions-4.12.2 tzdata-2024.2 urllib3-2.2.3 vine-5.1.0 wcwidth-0.2.13 zipp-3.21.0
(venv) ➜  flask_background_tasks 
(venv) ➜  flask_background_tasks pip freeze > requirements.txt

Our requirements.txt file is as follows:

amqp==5.3.1
billiard==4.2.1
blinker==1.9.0
celery==5.4.0
certifi==2024.8.30
charset-normalizer==3.4.0
click==8.1.7
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
Flask==3.1.0
idna==3.10
importlib_metadata==8.5.0
itsdangerous==2.2.0
Jinja2==3.1.4
kombu==5.4.2
MarkupSafe==3.0.2
marshmallow==3.23.1
packaging==24.2
prompt_toolkit==3.0.48
python-dateutil==2.9.0.post0
requests==2.32.3
six==1.17.0
typing_extensions==4.12.2
tzdata==2024.2
urllib3==2.2.3
vine==5.1.0
wcwidth==0.2.13
Werkzeug==3.1.3
zipp==3.21.0

We also create an images directory to save our images.

(venv) ➜  flask_background_tasks mkdir images

This is our project directory structure.

➜  flask_background_tasks ls -lah
total 32
drwxr-xr-x  7 flask-india  staff   224B Nov 17 23:22 .
drwxr-xr-x  8 flask-india  staff   256B Nov 17 13:55 ..
-rw-r--r--@ 1 flask-india  staff   6.0K Dec  1 22:27 .DS_Store
-rw-r--r--@ 1 flask-india  staff   3.0K Dec  8 20:15 flask_app.py
drwxr-xr-x  4 flask-india  staff   128B Nov 17 23:22 images
-rw-r--r--  1 flask-india  staff   523B Nov 17 22:10 requirements.txt
drwxr-xr-x  6 flask-india  staff   192B Nov 17 21:44 venv

We will run our Python app and Celery App separately.

(venv) ➜  flask_background_tasks python flask_app.py 
 * Serving Flask app 'flask_app'
 * Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:8000
 * Running on http://192.168.0.115:8000
Press CTRL+C to quit
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 794-535-064
(venv) ➜  flask_background_tasks celery -A flask_app.celery_app worker --loglevel=info
/Users/flask-india/Desktop/flask_india_blog/articles/flask_background_tasks/venv/lib/python3.9/site-packages/urllib3/__init__.py:35: NotOpenSSLWarning: urllib3 v2 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'LibreSSL 2.8.3'. See: https://github.com/urllib3/urllib3/issues/3020
  warnings.warn(

 -------------- celery@flask-india.local v5.4.0 (opalescent)
--- ***** ----- 
-- ******* ---- macOS-14.6-arm64-arm-64bit 2024-12-09 00:00:42
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         flask_app:0x106bed640
- ** ---------- .> transport:   amqp://mqadmin:**@localhost:5672/mq_admin_vhost
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . flask_app.download_image_from_url

[2024-12-09 00:00:42,745: WARNING/MainProcess] /Users/flask-india/Desktop/flask_india_blog/articles/flask_background_tasks/venv/lib/python3.9/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-12-09 00:00:42,752: INFO/MainProcess] Connected to amqp://mqadmin:**@127.0.0.1:5672/mq_admin_vhost
[2024-12-09 00:00:42,753: WARNING/MainProcess] /Users/flask-india/Desktop/flask_india_blog/articles/flask_background_tasks/venv/lib/python3.9/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-12-09 00:00:42,756: INFO/MainProcess] mingle: searching for neighbors
[2024-12-09 00:00:43,780: INFO/MainProcess] mingle: all alone
[2024-12-09 00:00:43,807: INFO/MainProcess] celery@Tanmoys-Laptop.local ready.

Let’s test our Flask API and Celery Worker to see it in action. Use the curl request below to trigger an API request, which will create a background task for our worker to process asynchronously.

Note: We have marshmallow schema guarding our API to discard incorrect input as needed.

curl --location 'http://localhost:8000/add-task' \
--header 'Content-Type: application/json' \
--data '{
    "url": "https://picsum.photos/200/300"
}'

You should see a similar kind of output on your worker as below.

[2025-07-06 21:57:18,114: INFO/MainProcess] Task flask_app.download_image_from_url[ead75fd1-d9fb-41fc-bc75-bc00aac69b7e] received
[2025-07-06 21:57:19,107: WARNING/ForkPoolWorker-8] response type: {'Connection': 'keep-alive', 'Content-Length': '13078', 'Server': 'nginx', 'Content-Type': 'image/jpeg', 'Cache-Control': 'public, max-age=2592000, stale-while-revalidate=60, stale-if-error=43200, immutable', 'Content-Disposition': 'inline; filename="99-200x300.jpg"', 'Picsum-Id': '99', 'Timing-Allow-Origin': '*', 'Accept-Ranges': 'bytes', 'Age': '1692863', 'Date': 'Sun, 06 Jul 2025 16:27:19 GMT', 'Via': '1.1 varnish', 'X-Served-By': 'cache-bom4738-BOM', 'X-Cache': 'HIT', 'X-Cache-Hits': '0', 'X-Timer': 'S1751819239.083826,VS0,VE1', 'Vary': 'Origin'}
[2025-07-06 21:57:19,114: INFO/ForkPoolWorker-8] Task flask_app.download_image_from_url[ead75fd1-d9fb-41fc-bc75-bc00aac69b7e] succeeded in 0.9992664590000686s: 'Image downloaded successfully: /Users/flask-india/Desktop/flask_india_blog/articles/flask_background_tasks/images'

We have saved our downloaded image successfully in our images directory with the help of following code snippet in our worker:

save_path = os.path.dirname(os.path.abspath(__file__))
directory = f'{save_path}/images'

if os.path.isdir(directory) == False:
    os.makedirs(directory, exist_ok=True)

Note: This will create the images directory in the same folder where our Flask app and celery worker are running. Also, our Flask app and Celery worker are in the same file. In real-world scenarios, these will typically be separate files if needed.

Using Custom Queues with Flask:

We have let Celery and RabbitMQ handle the creation and management of the Queue for our background image-downloading tasks. However, we can separate out the same if we need to monitor our image-downloading tasks as needed. This is a common concern in an actual production environment.

We can attach our worker to a dedicated queue using the CLI argument below:

celery -A flask_app.celery_app worker --loglevel=info -Q image_download

Note: -Q denotes the name of the Queue to be connected for our Celery Worker. Same can be provided via

You should see a similar output below:

 -------------- celery@flask-india.local v5.4.0 (opalescent)
--- ***** ----- 
-- ******* ---- macOS-14.6-arm64-arm-64bit 2025-07-06 22:18:32
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         flask_app:0x104deb9d0
- ** ---------- .> transport:   amqp://mqadmin:**@localhost:5672/mq_admin_vhost
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> image_download   exchange=image_download(direct) key=image_download


[tasks]
  . flask_app.download_image_from_url

[2025-07-06 22:18:32,226: INFO/MainProcess] Connected to amqp://mqadmin:**@127.0.0.1:5672/mq_admin_vhost
[2025-07-06 22:18:32,227: WARNING/MainProcess] /Users/flask-india/Desktop/flask_india_blog/articles/flask_background_tasks/venv/lib/python3.9/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2025-07-06 22:18:32,232: INFO/MainProcess] mingle: searching for neighbors
[2025-07-06 22:18:33,255: INFO/MainProcess] mingle: all alone
[2025-07-06 22:18:33,300: INFO/MainProcess] celery@flask-india.local ready.

The Queue to which our worker is connected is also specified in the stdout above.

Our RabbitMQ dashboard also shows the same.

Now that the consumer part is sorted, let’s make changes to our producer, which is represented in our API as follows.

task = download_image_from_url.apply_async(
    args=[url, file],
    queue='image_download'
)

Note: This is only change needed in our python script which houses both our API and Celery Worker.

Sharing our entire python script for better idea.


# Import Modules:

import os
import uuid
import traceback
from flask import Flask, jsonify, request
from celery import Celery
import requests
from marshmallow import Schema, fields, ValidationError

app = Flask(__name__)

# Configure Celery to use RabbitMQ:

app.config['CELERY_BROKER_URL'] = 'amqp://mqadmin:mqadminpass@localhost/mq_admin_vhost'

celery_app = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery_app.conf.update(app.config)

# Marshmallow API validation:

class UrlSchema(Schema):
    url = fields.Url(required=True)

headers = {'Content-Type': 'application/json'}

@celery_app.task
def download_image_from_url(url, file):
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise HTTP errors, if any

        print(f'response type: {response.headers}')

        save_path = os.path.dirname(os.path.abspath(__file__))
        directory = f'{save_path}/images'

        if os.path.isdir(directory) == False:
            os.makedirs(directory, exist_ok=True)

        content_type = response.headers.get('Content-Type')
        if content_type in ['image/jpeg', 'image/jpg']:
            file_name = f'{directory}/{file}.jpg'
        elif content_type == 'image/png':
            file_name = f'{directory}/{file}.png'

        # file_name

        with open(file_name, 'wb') as file:
            for chunk in response.iter_content(1024):
                file.write(chunk)

        return f"Image downloaded successfully: {directory}"

    except Exception as e:
        traceback.print_exc()

@app.route('/add-task', methods=['POST'])
def add_task():
    try:
        data = request.get_json()

        UrlSchema().load(data)

        # Call the task asynchronously:

        url = data.get('url')
        file = str(uuid.uuid1())
        task = download_image_from_url.apply_async(
            args=[url, file],
            queue='image_download'
        )
        response = {'task_id': task.id, 'status': 'Task submitted'}
        return jsonify(response), 202, headers

    except ValidationError as e:
        return e.messages, 400, headers

    except Exception as e:
        traceback.print_exc()
        response = {
            'message': 'unable to process request',
            'reason': 'internal server error'
        }
        return jsonify(response), 500, headers

if __name__ == '__main__':
    # print(os.path.dirname(os.path.abspath(__file__)))
    app.run(host='0.0.0.0', port=8000, debug=True)

Let’s test our app again with the same curl request as before. You should see a similar stdout as below.

[2025-07-06 22:27:53,120: INFO/MainProcess] Task flask_app.download_image_from_url[83fa58da-0906-4752-847b-b2e64ae57f98] received
[2025-07-06 22:27:54,115: WARNING/ForkPoolWorker-8] response type: {'Connection': 'keep-alive', 'Content-Length': '11975', 'Server': 'nginx', 'Content-Type': 'image/jpeg', 'Cache-Control': 'public, max-age=2592000, stale-while-revalidate=60, stale-if-error=43200, immutable', 'Content-Disposition': 'inline; filename="1033-200x300.jpg"', 'Picsum-Id': '1033', 'Timing-Allow-Origin': '*', 'Accept-Ranges': 'bytes', 'Age': '395343', 'Date': 'Sun, 06 Jul 2025 16:57:54 GMT', 'Via': '1.1 varnish', 'X-Served-By': 'cache-bom4751-BOM', 'X-Cache': 'HIT', 'X-Cache-Hits': '0', 'X-Timer': 'S1751821074.148421,VS0,VE1', 'Vary': 'Origin'}
[2025-07-06 22:27:54,119: INFO/ForkPoolWorker-8] Task flask_app.download_image_from_url[83fa58da-0906-4752-847b-b2e64ae57f98] succeeded in 0.998634582999955s: 'Image downloaded successfully: /Users/flask-india/Desktop/flask_india_blog/articles/flask_background_tasks/images'

Conclusion:

We have seen how we can create a background task in Celery so that our Flask Restful API can become non-blocking in nature, and we can do heavy processing in the background.

0
Subscribe to my newsletter

Read articles from Flask India directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Flask India
Flask India

We are a bunch of Developers who started to mentor beginners who started using Python and Flask in general in the Flask India Telegram Group. So this blog is a way to give it back to the community from where we have received guidance when we started as beginners.