Python Queue Example

Introduction
Python’s built-in queue
module is a key tool when you need safe data passing between threads or processes. Yet many developers overlook how choosing the right queue type—FIFO, LIFO or priority—can make or break your concurrency logic. When should you grab a Queue
vs. a PriorityQueue
, and how do you prevent deadlocks or lost data in busy worker threads?
The secret is understanding each queue’s behavior under load. By matching your data flow and locking strategy to the queue class, you avoid subtle bugs, improve throughput, and write clearer code. Let’s dive into concrete examples so you can pick the best queue for your scenario.
Basic Queue Usage
The simplest queue is the FIFO Queue
. It guarantees first-in, first-out ordering and handles locking internally. You create one and share it across threads without extra locks.
from queue import Queue
q = Queue(maxsize=10) # limit capacity
q.put('task1')
task = q.get()
Key points:
maxsize=0
means infinite capacity.put()
blocks when full;get()
blocks when empty.- Use
task_done()
andjoin()
for producer-consumer sync.
Tip: Always call
task_done()
after processingget()
to signal completion.
This pattern keeps producers from flooding consumers and helps you wait until all tasks finish with q.join()
.
Threading with Queue
When you spawn worker threads, a Queue
is your safest shareable structure. Threads call get()
to pull work and put()
to add results.
- Create a
Queue
. - Start N worker threads targeting a worker function.
- Enqueue tasks.
- Call
join()
to block until all tasks are processed.
import threading
from queue import Queue
def worker(q):
while True:
item = q.get()
if item is None:
break
process(item)
q.task_done()
q = Queue()
threads = []
for _ in range(4):
t = threading.Thread(target=worker, args=(q,))
t.start()
threads.append(t)
for item in range(10):
q.put(item)
q.join()
for _ in threads:
q.put(None)
for t in threads:
t.join()
For more on threading patterns, see this Python threading example.
Multiprocessing Queue
The multiprocessing
module offers its own Queue
for process-safe communication. Unlike queue.Queue
, this one uses pipes and locks under the hood.
from multiprocessing import Process, Queue
def worker(q):
q.put('result from process')
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
print(q.get()) # receives data from child
p.join()
Practical tips:
- Avoid large objects; they can slow pickling.
- Use
join()
on processes andclose()
on queues when done. - For many producers, consider a
Manager().Queue()
.
This lets you parallelize CPU-bound tasks safely across cores.
Priority Queues
When order matters by priority, switch to PriorityQueue
. Each entry is a tuple (priority, data)
.
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((2, 'low-priority task'))
pq.put((1, 'high-priority task'))
print(pq.get()[1]) # prints 'high-priority task'
Use cases:
- Scheduling jobs by deadline.
- Managing task priorities in web servers.
- Real-time event handling.
Note: Lower numbers come out first. Tie-breaking follows FIFO for same priority.
Mixing priorities with worker threads is as simple as swapping Queue
for PriorityQueue
.
Queues with asyncio
In async code, use asyncio.Queue
. It integrates with the event loop and uses await
instead of blocking calls.
import asyncio
async def producer(q):
for i in range(5):
await q.put(i)
print(f'put {i}')
async def consumer(q):
while True:
item = await q.get()
print(f'got {item}')
q.task_done()
async def main():
q = asyncio.Queue()
prod = asyncio.create_task(producer(q))
cons = asyncio.create_task(consumer(q))
await prod
await q.join()
cons.cancel()
asyncio.run(main())
This pattern keeps your async producers and consumers in sync without threads.
Common Pitfalls and Tips
Even with built-in safety, queues can be misused:
- Forgetting
task_done()
leads tojoin()
hanging. - No sentinel values means threads never exit.
- Overfilled queues block producers unexpectedly.
Best practices:
- Always signal thread exit (e.g.,
None
sentinel). - Choose small
maxsize
to prevent memory bloat. - Wrap
get()
in try/except if you use timeouts.
By spotting these issues early, you keep systems reliable when traffic spikes.
Conclusion
Python’s queue module is a powerful ally for managing concurrency. From simple FIFO queues to priority scheduling and async workflows, each class solves a clear need. Understanding when to use Queue
, PriorityQueue
or asyncio.Queue
helps you avoid deadlocks, lost data, and hanging threads or processes. Practicing with real examples and following the tips above will make your code robust and maintainable. Now it’s your turn: pick a queue type, build a small producer-consumer task, and see concurrency in action.
Ready to streamline your data pipelines? Experiment with queue types today and share your findings with your team!
Subscribe to my newsletter
Read articles from Mateen Kiani directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
