[Python] Pipes vs. Queues for Flawless Data Communication in Multiprocessing


Understanding Process
Key characteristics of process:
Process must contain a main thread.
Code, data, stack and heap are independent.
- This means it is not so easy and low-cost to share memories within multiple processes.
CPU + Memory space are independent.
Communication within processes are done by using piple, file and socket.
- Context switching
Sharing State
from multiprocessing import Process, current_process, Value, Array
import os
def generate_update_number(v: int):
for _ in range(50):
v.value += 1
print(current_process().name, 'data', v)
def main():
parent_process_id = os.getpid()
print(f"Parent Process ID: {parent_process_id}")
processes = list()
share_value = Value('i', 0)
for _ in range(1, 10):
p = Process(target=generate_update_number, args=(share_value,))
processes.append(p)
p.start()
for p in processes:
p.join()
print('Final Data in parent process:', share_value)
if __name__ == '__main__':
main()
Output:
Parent Process ID: 490
Process-1 data <Synchronized wrapper for c_int(50)>
Process-2 data <Synchronized wrapper for c_int(100)>
Process-3 data <Synchronized wrapper for c_int(150)>
Process-4 data <Synchronized wrapper for c_int(200)>
Process-5 data <Synchronized wrapper for c_int(250)>
Process-6 data <Synchronized wrapper for c_int(300)>
Process-7 data <Synchronized wrapper for c_int(350)>
Process-8 data <Synchronized wrapper for c_int(400)>
Process-9 data <Synchronized wrapper for c_int(450)>
Final Data in parent process: <Synchronized wrapper for c_int(450)>
Using Value and Array module from multiprocessing package is a way to share data within multiple processes. You can see that the value is increased by 50, which means, 9 processes are using the same variable and adding 50.
share_value = Value('i', 0)
‘i’ means that the data type is an int. If your data type is a char, it’s going to be Value(‘c’, ‘a’)
. If str, Value(‘s’, ‘some str’)
.
Using Array
If your data is not just an int or str, but an array, you can use from multiprocessing import Array
.
share_numbers = Array('i', range(50))
More Options
Since Python 3.8 release, you can use
from multiprocess import shared memory
.from multiprocess import Manager
.
Pipe
In multiprocessing, a pipe is something that connects child connection (child_conn) and parent connection (parent_conn), which means pipe is for two-way communication (1 to 1).
Let’s say Player A and B are playing the tennis. If the player A sends, player B must receive to flawlessly play the game. That’s the rule. It’s the same for the pipe.
Code:
from multiprocessing import Process, Pipe, current_process
import time
import os
def worker(id, baseNum, conn):
process_id = os.getpid()
process_name = current_process().name
sub_total = 0
for i in range(baseNum):
sub_total += 1
conn.send(sub_total)
conn.close()
print(f'Process ID: {process_id}, Process Name: {process_name}')
print(f'Result: {sub_total}')
def main():
parent_process_id = os.getpid()
print(f'Parent process id: {parent_process_id}')
start_time = time.time()
parent_conn, child_conn = Pipe()
t = Process(name=str(1), target=worker, args=(1, 1000000, child_conn))
t.start()
t.join()
print("--- %s seconds ---" % (time.time() - start_time))
print('Main-processing total count: {}'.format(parent_conn.recv()))
print('Main-processing done.')
if __name__ == '__main__':
main()
Output:
Parent process id: 2340
Process ID: 2341, Process Name: 1
Result: 1000000
--- 0.16553831100463867 seconds ---
Main-processing total count: 1000000
Main-processing done.
Queue
If you have basic computer science knowledge, you may have heard about queue data structure. One of its key characteristics is FIFO (First-in-first-out). Using the put()
function, we can insert data to the Queue, and using get()
, we can get items from queues.
Code:
from multiprocessing import Process, Queue, current_process
import time
import os
def worker(id, baseNum, q):
process_id = os.getpid()
process_name = current_process().name
sub_total = 0
for i in range(baseNum):
sub_total += 1
q.put(sub_total)
print(f'Process ID: {process_id}, Process Name: {process_name}')
print(f'Result: {sub_total}')
def main():
parent_process_id = os.getpid()
print(f'Parent process id: {parent_process_id}')
processes = list()
start_time = time.time()
q = Queue()
for i in range(5):
t = Process(name=str(i), target=worker, args=(i, 1000000, q))
processes.append(t)
t.start()
for process in processes:
process.join()
print("--- %s seconds ---" % (time.time() - start_time))
q.put('exit')
total = 0
while True:
tmp = q.get()
if tmp == 'exit':
break
else:
total += tmp
print()
print('Main-processing total count: {}'.format(total))
print('Main-processing done.')
if __name__ == '__main__':
main()
Output:
Parent process id: 387
Process ID: 391, Process Name: 3
Process ID: 390, Process Name: 2Result: 1000000
Result: 1000000
Process ID: 392, Process Name: 4
Result: 1000000
Process ID: 389, Process Name: 1
Result: 1000000
Process ID: 388, Process Name: 0
Result: 1000000
--- 1.0864899158477783 seconds ---
Main-processing total count: 5000000
Main-processing done.
You can see that the total count is 5000000
.
Subscribe to my newsletter
Read articles from Lim Woojae directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Lim Woojae
Lim Woojae
Computer Science Enthusiast with a Drive for Excellence | Data Science | Web Development | Passionate About Tech & Innovation