Synchronization Mechanisms: Exploring Locks ,Semaphores and more ...

Firas SAADAOUIFiras SAADAOUI
7 min read
  • Concurrency Challenges in Microservices

In microservices architectures, concurrency stands as a substantial subject, encompassing numerous challenges that demand careful consideration and strategic solutions. those challenges include :
- Data Consistency :
this article Data: The Ultimate Hurdle in the Migration Journey to Microservices Architecture talk about more in details about this challenge

- Distributed Transactions :

the challenges of implementing distributed transactions in a microservice architecture are numerous :

  1. Consistency
    If a transaction involves updating data in two services, it's crucial to maintain a consistent state even if one service fails

  2. Atomicity
    Guaranteeing atomicity, where either all or none of the operations within a transaction are executed, becomes intricate. Failures at any point in the transaction should result in a rollback of changes to maintain integrity

  3. Isolation
    Isolation is challenging due to the distributed nature of microservices. Concurrent transactions from different services should not interfere with each other, ensuring each transaction's correctness.

  4. Durability
    Durability across microservices is complex. If one service commits changes, and another service fails to persist its changes, it may lead to inconsistencies


  • Understanding Locking Mechanisms

Locking stands out as a primary mechanism for managing concurrency within the framework of microservices architecture.
Locks are synchronization primitives that help control the execution of threads or processes in a way that ensures mutual exclusion. Here's an overview of common locking mechanisms

  1. Mutex (short for mutual exclusion) :
    In computer science, a lock or mutex is a synchronization primitive that prevents state from being modified or accessed by multiple threads of execution at once

    Let's delve into this through a practical example:
    Imagine you have a counter variable that multiple threads need to increment. Without proper synchronization, if two or more threads attempt to increment the counter simultaneously, you could end up with incorrect results due to race conditions

     import threading
    
     class Counter:
         def __init__(self):
             self.value = 0
             self.mutex = threading.Lock()  # Creating a mutex
    
         def increment(self):
             with self.mutex:  # Acquiring the lock using "with" statement
                 current_value = self.value
                 # Simulate some processing time | # (In a real scenario, this could be a more complex computation)
                 current_value += 1
                 # Simulate some processing time
                 self.value = current_value
    
     # Creating an instance of the Counter class
     counter = Counter()
    
     # Function to increment the counter multiple times
     def increment_counter():
         for _ in range(100000):
             counter.increment()
    
     # Creating two threads to increment the counter concurrently
     thread1 = threading.Thread(target=increment_counter)
     thread2 = threading.Thread(target=increment_counter)
    
     # Starting the threads
     thread1.start()
     thread2.start()
    
     # Waiting for both threads to finish
     thread1.join()
     thread2.join()
    
     # Displaying the final value of the counter
     print("Final Counter Value:", counter.value)
    
  2. Semaphore

    Semaphores can be used for both exclusive and shared access to resources, depending on the count.
    I will illustrate the concept using the same example, but this time, I'll implement it using semaphores.

     import threading
    
     class Counter:
         def __init__(self):
             self.value = 0
             self.semaphore = threading.Semaphore(value=1)  # Creating a semaphore with an initial value of 1
    
         def increment(self):
             with self.semaphore:  # Acquiring the semaphore using "with" statement
                 current_value = self.value
                 # Simulate some processing time
                 current_value += 1
                 # Simulate some processing time
                 self.value = current_value
    
     # Creating an instance of the Counter class
     counter = Counter()
    
     # Function to increment the counter multiple times
     def increment_counter():
         for _ in range(100000):
             counter.increment()
    
     # Creating two threads to increment the counter concurrently
     thread1 = threading.Thread(target=increment_counter)
     thread2 = threading.Thread(target=increment_counter)
    
     # Starting the threads
     thread1.start()
     thread2.start()
    
     # Waiting for both threads to finish
     thread1.join()
     thread2.join()
    
     # Displaying the final value of the counter
     print("Final Counter Value:", counter.value)
    
  3. Rlock

    Rlock is Reentring lock that keeps track of the number of times a lock has been acquired and ensures that the same number of release operations are needed to unlock it.
    Rlocks arises when a thread needs to acquire a lock multiple times without causing a deadlock.

     import threading
    
     class Counter:
         def __init__(self):
             self.value = 0
             self.semaphore = threading.Semaphore(value=1)  # Creating a semaphore with an initial value of 1
    
         def increment(self):
             with self.semaphore:  # Acquiring the semaphore using "with" statement
                 current_value = self.value
                 # Simulate some processing time
                 # (In a real scenario, this could be a more complex computation)
                 current_value += 1
                 # Simulate some processing time
                 self.value = current_value
    
     # Creating an instance of the Counter class
     counter = Counter()
    
     # Function to increment the counter multiple times
     def increment_counter():
         for _ in range(100000):
             counter.increment()
    
     # Creating two threads to increment the counter concurrently
     thread1 = threading.Thread(target=increment_counter)
     thread2 = threading.Thread(target=increment_counter)
    
     # Starting the threads
     thread1.start()
     thread2.start()
    
     # Waiting for both threads to finish
     thread1.join()
     thread2.join()
    
     # Displaying the final value of the counter
     print("Final Counter Value:", counter.value)
    
  4. Distributed Locks

    Distributed locks are used in distributed systems to synchronize processes or nodes across a network. They ensure that only one process in the entire distributed system can hold the lock at a time
    One use case is : Preventing Duplicate Job Execution in a Distributed System

    Consider a distributed system where multiple nodes are responsible for executing jobs, and each job should be executed only once across the entire system. To prevent duplicate execution, a distributed lock can be employed. Here's a simplified example using Python and a hypothetical distributed lock service

     import time
     import threading
     from redis import Redis  # Example: Using Redis as a distributed lock service
    
     class JobExecutor:
         def __init__(self, node_id):
             self.node_id = node_id
             self.distributed_lock = Redis()  # Using Redis as the distributed lock service
    
         def execute_job(self, job_id):
             lock_key = f"job_lock:{job_id}"
    
             # Attempt to acquire the distributed lock
             with self.distributed_lock.lock(lock_key, blocking_timeout=5) as lock_acquired:
                 if lock_acquired:
                     try:
                         # Execute the job since the lock is acquired
                         print(f"Node {self.node_id} executing job {job_id}")
                         time.sleep(2)  # Simulate job execution time
                         print(f"Node {self.node_id} completed job {job_id}")
                     finally:
                         # Release the distributed lock after job execution
                         self.distributed_lock.lock_release(lock_key)
                 else:
                     print(f"Node {self.node_id} unable to acquire lock for job {job_id}. Job skipped.")
    
     # Creating two instances of JobExecutor representing different nodes
     node1_executor = JobExecutor(node_id=1)
     node2_executor = JobExecutor(node_id=2)
    
     # Function to simulate job execution
     def simulate_job_execution(node_executor, job_id):
         for _ in range(3):
             node_executor.execute_job(job_id)
    
     # Creating two threads to simulate concurrent job execution on different nodes
     thread1 = threading.Thread(target=simulate_job_execution, args=(node1_executor, "A"))
     thread2 = threading.Thread(target=simulate_job_execution, args=(node2_executor, "B"))
    
     # Starting the threads
     thread1.start()
     thread2.start()
    
     # Waiting for both threads to finish
     thread1.join()
     thread2.join()
    
  5. Read-Write Locks
    Read-Write locks distinguish between read access and write access to a shared resource. Multiple threads can acquire the read lock simultaneously for reading. Only one thread can acquire the write lock for writing, and it excludes all other threads (readers and writers)
    This can be beneficial in scenarios where reads are more frequent than writes, and concurrent reads don't affect the consistency of the shared data.

     import threading
     import time
    
     class ReadWriteLock:
         def __init__(self):
             self._lock = threading.Lock()
             self._readers_count = 0
             self._writers_count = 0
             self._write_condition = threading.Condition()
    
         def acquire_read(self):
             with self._lock:
                 while self._writers_count > 0:
                     # Wait if there's an active writer
                     self._lock.release()
                     time.sleep(0.1)
                     self._lock.acquire()
    
                 self._readers_count += 1
    
         def release_read(self):
             with self._lock:
                 self._readers_count -= 1
                 if self._readers_count == 0:
                     self._write_condition.notify()
    
         def acquire_write(self):
             with self._lock:
                 self._writers_count += 1
                 while self._readers_count > 0 or self._writers_count > 1:
                     # Wait if there are active readers or another writer
                     self._write_condition.wait()
    
         def release_write(self):
             with self._lock:
                 self._writers_count -= 1
                 self._write_condition.notify_all()
    
     # Shared resource
     shared_data = "Initial Data"
     rw_lock = ReadWriteLock()
    
     def read_operation(reader_id):
         while True:
             rw_lock.acquire_read()
             print(f"Reader-{reader_id} is reading: {shared_data}")
             rw_lock.release_read()
             time.sleep(1)
    
     def write_operation(writer_id):
         while True:
             rw_lock.acquire_write()
             shared_data = f"Data modified by Writer-{writer_id}"
             print(f"Writer-{writer_id} is writing: {shared_data}")
             rw_lock.release_write()
             time.sleep(2)
    
     # Creating multiple reader and writer threads
     readers = [threading.Thread(target=read_operation, args=(i,)) for i in range(3)]
     writers = [threading.Thread(target=write_operation, args=(i,)) for i in range(2)]
    
     # Starting the threads
     for reader in readers:
         reader.start()
    
     for writer in writers:
         writer.start()
    
     # Allowing threads to run for some time
     time.sleep(10)
    
     # Stopping the threads (for demonstration purposes)
     for reader in readers:
         reader.join()
    
     for writer in writers:
         writer.join()
    
  6. Transactional-locking

Transactional locking, often associated with database systems and transactional processing, involves using locks within the context of a transaction. The goal is to ensure the consistency and isolation of transactions by preventing concurrent access to the same data during a transaction's execution. This type of locking is commonly employed in database management systems that support transactions (e.g., relational databases).
Here's an example illustrating the concept of transactional locking:

import threading
import time

class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()

    def deposit(self, amount):
        with self.lock:
            current_balance = self.balance
            # Simulate processing time
            time.sleep(0.1)
            current_balance += amount
            self.balance = current_balance

    def withdraw(self, amount):
        with self.lock:
            current_balance = self.balance
            if current_balance >= amount:
                # Simulate processing time
                time.sleep(0.1)
                current_balance -= amount
                self.balance = current_balance

    def transfer(self, recipient, amount):
        with self.lock:
            self.withdraw(amount)
            recipient.deposit(amount)

# Shared BankAccount instances
account1 = BankAccount(balance=1000)
account2 = BankAccount(balance=500)

# Function to simulate a transaction involving transfers between accounts
def perform_transaction():
    for _ in range(5):
        account1.transfer(account2, 50)
        account2.transfer(account1, 30)

# Creating two threads to simulate concurrent transactions
thread1 = threading.Thread(target=perform_transaction)
thread2 = threading.Thread(target=perform_transaction)

# Starting the threads
thread1.start()
thread2.start()

# Waiting for both threads to finish
thread1.join()
thread2.join()

# Displaying the final balances of the accounts
print(f"Final balance of account1: {account1.balance}")
print(f"Final balance of account2: {account2.balance}")

The use of a lock ensures that the operations within each method (deposit, withdraw, transfer) are atomic. This helps maintain the integrity of the data by preventing concurrent access that could lead to inconsistencies, especially when transactions involve multiple steps.

0
Subscribe to my newsletter

Read articles from Firas SAADAOUI directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Firas SAADAOUI
Firas SAADAOUI

Software engineer with a robust background in Java development, certified in Scrum, and proficient in Azure. Extensive experience in the finance sector with a focus on microservices architecture. Skilled in designing, developing, and maintaining enterprise-level information systems for management, compliance, and reporting. Adept at working with databases, middlewares, and APIs, and implementing robust security measures. Demonstrates a deep understanding of Java programming and expertise in Industry Standard protocols.