Synchronization Mechanisms: Exploring Locks ,Semaphores and more ...
Concurrency Challenges in Microservices
In microservices architectures, concurrency stands as a substantial subject, encompassing numerous challenges that demand careful consideration and strategic solutions. those challenges include :
- Data Consistency :
this article Data: The Ultimate Hurdle in the Migration Journey to Microservices Architecture talk about more in details about this challenge
- Distributed Transactions :
the challenges of implementing distributed transactions in a microservice architecture are numerous :
Consistency
If a transaction involves updating data in two services, it's crucial to maintain a consistent state even if one service failsAtomicity
Guaranteeing atomicity, where either all or none of the operations within a transaction are executed, becomes intricate. Failures at any point in the transaction should result in a rollback of changes to maintain integrityIsolation
Isolation is challenging due to the distributed nature of microservices. Concurrent transactions from different services should not interfere with each other, ensuring each transaction's correctness.Durability
Durability across microservices is complex. If one service commits changes, and another service fails to persist its changes, it may lead to inconsistencies
Understanding Locking Mechanisms
Locking stands out as a primary mechanism for managing concurrency within the framework of microservices architecture.
Locks are synchronization primitives that help control the execution of threads or processes in a way that ensures mutual exclusion. Here's an overview of common locking mechanisms
Mutex (short for mutual exclusion) :
In computer science, a lock or mutex is a synchronization primitive that prevents state from being modified or accessed by multiple threads of execution at onceLet's delve into this through a practical example:
Imagine you have a counter variable that multiple threads need to increment. Without proper synchronization, if two or more threads attempt to increment the counter simultaneously, you could end up with incorrect results due to race conditionsimport threading class Counter: def __init__(self): self.value = 0 self.mutex = threading.Lock() # Creating a mutex def increment(self): with self.mutex: # Acquiring the lock using "with" statement current_value = self.value # Simulate some processing time | # (In a real scenario, this could be a more complex computation) current_value += 1 # Simulate some processing time self.value = current_value # Creating an instance of the Counter class counter = Counter() # Function to increment the counter multiple times def increment_counter(): for _ in range(100000): counter.increment() # Creating two threads to increment the counter concurrently thread1 = threading.Thread(target=increment_counter) thread2 = threading.Thread(target=increment_counter) # Starting the threads thread1.start() thread2.start() # Waiting for both threads to finish thread1.join() thread2.join() # Displaying the final value of the counter print("Final Counter Value:", counter.value)
Semaphore
Semaphores can be used for both exclusive and shared access to resources, depending on the count.
I will illustrate the concept using the same example, but this time, I'll implement it using semaphores.import threading class Counter: def __init__(self): self.value = 0 self.semaphore = threading.Semaphore(value=1) # Creating a semaphore with an initial value of 1 def increment(self): with self.semaphore: # Acquiring the semaphore using "with" statement current_value = self.value # Simulate some processing time current_value += 1 # Simulate some processing time self.value = current_value # Creating an instance of the Counter class counter = Counter() # Function to increment the counter multiple times def increment_counter(): for _ in range(100000): counter.increment() # Creating two threads to increment the counter concurrently thread1 = threading.Thread(target=increment_counter) thread2 = threading.Thread(target=increment_counter) # Starting the threads thread1.start() thread2.start() # Waiting for both threads to finish thread1.join() thread2.join() # Displaying the final value of the counter print("Final Counter Value:", counter.value)
Rlock
Rlock is Reentring lock that keeps track of the number of times a lock has been acquired and ensures that the same number of release operations are needed to unlock it.
Rlocks arises when a thread needs to acquire a lock multiple times without causing a deadlock.import threading class Counter: def __init__(self): self.value = 0 self.semaphore = threading.Semaphore(value=1) # Creating a semaphore with an initial value of 1 def increment(self): with self.semaphore: # Acquiring the semaphore using "with" statement current_value = self.value # Simulate some processing time # (In a real scenario, this could be a more complex computation) current_value += 1 # Simulate some processing time self.value = current_value # Creating an instance of the Counter class counter = Counter() # Function to increment the counter multiple times def increment_counter(): for _ in range(100000): counter.increment() # Creating two threads to increment the counter concurrently thread1 = threading.Thread(target=increment_counter) thread2 = threading.Thread(target=increment_counter) # Starting the threads thread1.start() thread2.start() # Waiting for both threads to finish thread1.join() thread2.join() # Displaying the final value of the counter print("Final Counter Value:", counter.value)
Distributed Locks
Distributed locks are used in distributed systems to synchronize processes or nodes across a network. They ensure that only one process in the entire distributed system can hold the lock at a time
One use case is :Preventing Duplicate Job Execution in a Distributed System
Consider a distributed system where multiple nodes are responsible for executing jobs, and each job should be executed only once across the entire system. To prevent duplicate execution, a distributed lock can be employed. Here's a simplified example using Python and a hypothetical distributed lock service
import time import threading from redis import Redis # Example: Using Redis as a distributed lock service class JobExecutor: def __init__(self, node_id): self.node_id = node_id self.distributed_lock = Redis() # Using Redis as the distributed lock service def execute_job(self, job_id): lock_key = f"job_lock:{job_id}" # Attempt to acquire the distributed lock with self.distributed_lock.lock(lock_key, blocking_timeout=5) as lock_acquired: if lock_acquired: try: # Execute the job since the lock is acquired print(f"Node {self.node_id} executing job {job_id}") time.sleep(2) # Simulate job execution time print(f"Node {self.node_id} completed job {job_id}") finally: # Release the distributed lock after job execution self.distributed_lock.lock_release(lock_key) else: print(f"Node {self.node_id} unable to acquire lock for job {job_id}. Job skipped.") # Creating two instances of JobExecutor representing different nodes node1_executor = JobExecutor(node_id=1) node2_executor = JobExecutor(node_id=2) # Function to simulate job execution def simulate_job_execution(node_executor, job_id): for _ in range(3): node_executor.execute_job(job_id) # Creating two threads to simulate concurrent job execution on different nodes thread1 = threading.Thread(target=simulate_job_execution, args=(node1_executor, "A")) thread2 = threading.Thread(target=simulate_job_execution, args=(node2_executor, "B")) # Starting the threads thread1.start() thread2.start() # Waiting for both threads to finish thread1.join() thread2.join()
Read-Write Locks
Read-Write locks distinguish between read access and write access to a shared resource. Multiple threads can acquire the read lock simultaneously for reading. Only one thread can acquire the write lock for writing, and it excludes all other threads (readers and writers)
This can be beneficial in scenarios where reads are more frequent than writes, and concurrent reads don't affect the consistency of the shared data.import threading import time class ReadWriteLock: def __init__(self): self._lock = threading.Lock() self._readers_count = 0 self._writers_count = 0 self._write_condition = threading.Condition() def acquire_read(self): with self._lock: while self._writers_count > 0: # Wait if there's an active writer self._lock.release() time.sleep(0.1) self._lock.acquire() self._readers_count += 1 def release_read(self): with self._lock: self._readers_count -= 1 if self._readers_count == 0: self._write_condition.notify() def acquire_write(self): with self._lock: self._writers_count += 1 while self._readers_count > 0 or self._writers_count > 1: # Wait if there are active readers or another writer self._write_condition.wait() def release_write(self): with self._lock: self._writers_count -= 1 self._write_condition.notify_all() # Shared resource shared_data = "Initial Data" rw_lock = ReadWriteLock() def read_operation(reader_id): while True: rw_lock.acquire_read() print(f"Reader-{reader_id} is reading: {shared_data}") rw_lock.release_read() time.sleep(1) def write_operation(writer_id): while True: rw_lock.acquire_write() shared_data = f"Data modified by Writer-{writer_id}" print(f"Writer-{writer_id} is writing: {shared_data}") rw_lock.release_write() time.sleep(2) # Creating multiple reader and writer threads readers = [threading.Thread(target=read_operation, args=(i,)) for i in range(3)] writers = [threading.Thread(target=write_operation, args=(i,)) for i in range(2)] # Starting the threads for reader in readers: reader.start() for writer in writers: writer.start() # Allowing threads to run for some time time.sleep(10) # Stopping the threads (for demonstration purposes) for reader in readers: reader.join() for writer in writers: writer.join()
Transactional-locking
Transactional locking, often associated with database systems and transactional processing, involves using locks within the context of a transaction. The goal is to ensure the consistency and isolation of transactions by preventing concurrent access to the same data during a transaction's execution. This type of locking is commonly employed in database management systems that support transactions (e.g., relational databases).
Here's an example illustrating the concept of transactional locking:
import threading
import time
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def deposit(self, amount):
with self.lock:
current_balance = self.balance
# Simulate processing time
time.sleep(0.1)
current_balance += amount
self.balance = current_balance
def withdraw(self, amount):
with self.lock:
current_balance = self.balance
if current_balance >= amount:
# Simulate processing time
time.sleep(0.1)
current_balance -= amount
self.balance = current_balance
def transfer(self, recipient, amount):
with self.lock:
self.withdraw(amount)
recipient.deposit(amount)
# Shared BankAccount instances
account1 = BankAccount(balance=1000)
account2 = BankAccount(balance=500)
# Function to simulate a transaction involving transfers between accounts
def perform_transaction():
for _ in range(5):
account1.transfer(account2, 50)
account2.transfer(account1, 30)
# Creating two threads to simulate concurrent transactions
thread1 = threading.Thread(target=perform_transaction)
thread2 = threading.Thread(target=perform_transaction)
# Starting the threads
thread1.start()
thread2.start()
# Waiting for both threads to finish
thread1.join()
thread2.join()
# Displaying the final balances of the accounts
print(f"Final balance of account1: {account1.balance}")
print(f"Final balance of account2: {account2.balance}")
The use of a lock ensures that the operations within each method (deposit, withdraw, transfer) are atomic. This helps maintain the integrity of the data by preventing concurrent access that could lead to inconsistencies, especially when transactions involve multiple steps.
Subscribe to my newsletter
Read articles from Firas SAADAOUI directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Firas SAADAOUI
Firas SAADAOUI
Software engineer with a robust background in Java development, certified in Scrum, and proficient in Azure. Extensive experience in the finance sector with a focus on microservices architecture. Skilled in designing, developing, and maintaining enterprise-level information systems for management, compliance, and reporting. Adept at working with databases, middlewares, and APIs, and implementing robust security measures. Demonstrates a deep understanding of Java programming and expertise in Industry Standard protocols.