crowRedis Data Replication , delving into distributed systems.

Biohacker0Biohacker0
12 min read

How I implemented the data replication concept of master and worker previously known as master/slave architecture for distributed databases on my project crowRedis.

If you want to know what have I done till now : Read the previous blog

Scale Relational Database

GitHub Link -https://github.com/biohacker0/crowRedis/tree/dataReplication-1

The replication feature is currently on dataReplication-1 branch only not on the main branch.

Why do this?:

Where would be fun if I just left crowRedis to do set, get and those transactions, that is just language play.
You don't get good just by doing the bare minimum, I want to tackle most of the big-sounding concepts.

And this was my solution to data replication, I wrote on paper how I would do it, and I iterated 4 times to see how I would replicate data if someone told me:
hey, I have this database-1 and I want to connect my other databases to this one and replicate the data of database-1 to all the databases connected.

You don't need to read someone else's blog to do this, the problem is straightforward, and you should think about how you would do this.

isn't my handwriting beautiful, ( ˘͈ ᵕ ˘͈♡)

Master / Worker dynamic mode

** Currently we are replicating data for set data command only , not for delete or other things, I will add those afterwords maybe **

Yes, you can run my script to act as both master or worker based on the user's choice.

For the clear test, you can clone the same repo twice and name one folder instance 1 and their run the script in master mode and another as instance-1 and run it in worker mode for clarity.

    def start(self):
        replication_thread = None

        if self.is_master:
            replication_thread = threading.Thread(target=self.run_replication_log, name='replication-thread')
            print("master thread started")
        else:
            # If this server is a worker, connect to the master
            try:
                self.worker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                master_host, master_port = self.master_address.split(':')
                master_port = int(master_port)
                self.worker_socket.connect((master_host, master_port))
                self.worker_socket.send(b"REGISTER")
                print(f"Connected to master server at {self.master_address}")
                replication_thread = threading.Thread(target=self.replicate_data_from_master, name='worker-replication-thread')
                print("worker thread started")
            except Exception as e:
                print(f"Error connecting to master: {e}")
                # Handle errors, e.g., by setting self.worker_socket to None
                return

        # Start the replication thread
        replication_thread.start()

        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            server_socket.bind((self.host, self.port))
            server_socket.listen()

            print(f"Server listening on {self.host}:{self.port}")

            while True:
                try:
                    client_socket, client_address = server_socket.accept()
                    print(f"Accepted connection from {client_address[0]}:{client_address[1]}")
                    threading.Thread(target=self.handle_client, args=(client_socket,client_address)).start()
                except Exception as e:
                    print(f"Error accepting client connection: {e}")



if __name__ == "__main__":
    is_master = input("Are you running as a master (y/n)? ").lower() == 'y'

    if is_master:
        host = '127.0.0.1'     
        port = 6381
        master_address = None
        worker_port = None
        redis_server = RedisServer(host, port, is_master, master_address)
    else:
        master_address = '127.0.0.1:6381'
        host = '127.0.0.1'
        worker_port = 6382
        redis_server = RedisServer(host, worker_port, is_master, master_address)

    redis_server.start()

1. How it works as Master and Worker Dynamically:

The server can operate as either a master or a worker, depending on the choice made when the script is executed. Here's how it works in each mode:

Master Mode:

  • In master mode👑, the server acts as the primary data source.

  • When starting in master mode, the server listens for incoming client connections on a specified host and port (e.g., host: 127.0.0.1 and port: 6381)🚀.

  • It also starts a replication thread (🏃‍♂️run_replication_log) that logs write operations into a queue (the log_queue). This is the replication log🔄.

  • Clients can connect to the master to execute various Redis commands💼.

  • If a client is a worker, it can register with the master using the "REGISTER"👥 command. The master adds the connected worker's socket to a list of connected workers.

  • The master continuously logs write operations, and it also manages replication to all registered worker servers.🔄

  • Data is replicated from the master to all connected workers by sending the replication log entries to each worker's socket.🔄

  • So both the worker and normal clients can connect to the master, The worker lets the master know that it is a worker client by sending msg "REGISTER" at the connect time.

Worker Mode:

  • . Worker as an Independent Server:

    • When the server is started in worker mode, it can connect to the master for data replication, as you mentioned.💪

    • However, the worker server is not restricted to just replicating data; it also behaves like a typical Redis server.🤖

    • It can accept client connections and handle read (GET) and write (SET, DEL, etc.) queries independently, just like a master server.🤝

    • When clients connect to the worker server and send read queries (e.g., GET), the worker can respond to these queries based on its own data.📚

    • It maintains its own data store, which is a copy of the data it replicates from the master. The data store is used to handle read queries locally.📦

2. Replication from the Master:

  • 🚀In addition to being an independent server, the worker server also continuously connects to the master to replicate data.

  • It sends a "REGISTER"🔌 command to the master to indicate its intention to replicate.

  • The master, upon receiving the registration request, adds the worker to its list of connected workers.📋

  • 🚢The worker's primary role, in this case, is to receive and apply replication data sent by the master. The replication data typically includes write operations (e.g., SET, DEL) that the master has received from its clients.🚢

  • 🔄The worker processes these replication data entries as plain Redis commands and applies them to its local data store. This ensures that the worker's data remains consistent with the master's data.🔄

If you have read this, I know you must have thought of 5-10 cases where my code would crumble or there would be data inconsistency.

I too know that. But let's deal with one problem at a time

And if you want to run multiple worker servers, then you need to tweak the code a bit, in the else part, just take the host, and worker port, as input from the user, so based on that you can run what port you like and there is no conflict.

huge respect for backed/network engineers, these ports and socket things fired my brain.

I love it when cute anime girls do this adorable little salute thing :  r/Gundam

once it's done it looks simple, but believe me, the theory of TCP is very different than the implementation.

also, sockets are a sort of abstraction layer that provides a high-level API for interacting with TCP and UDP protocols.

so pay attention in network classes where they teach about ports and how TCP/udp work, else you will bang your heads on the desk like me, cause you won't know where the connection failed, why it failed and what caused the failure.

I and 3-4 online frens sat down and, we debugged my socket mess at night(2 am) on Discord.

Hurray Cute Anime Girl Cheering GIF | GIFDB.com

Data Replication in crowRedis:

I've implemented data replication, a crucial feature for distributed systems. Data replication ensures that data remains consistent across multiple database instances, providing fault tolerance and load balancing.

1. Initialization: The data replication process begins during server initialization. I've designed this code to allow one crowRedis instance to act as a master and another as a worker. You, as the system administrator, decide the role during startup.

2. Master-Worker Connection: If you choose to run the server as a master, you'll create a socket and bind it to a specific host and port, making it accessible for other servers to connect. If the server is running as a worker, it connects to the master server using the specified IP address and port.

if __name__ == "__main__":
    is_master = input("Are you running as a master (y/n)? ").lower() == 'y'

    if is_master:
        host = '127.0.0.1'     
        port = 6381
        master_address = None
        worker_port = None
        redis_server = RedisServer(host, port, is_master, master_address)
    else:
        master_address = '127.0.0.1:6381'
        host = '127.0.0.1'
        worker_port = 6382
        redis_server = RedisServer(host, worker_port, is_master, master_address)

    redis_server.start()

3. Replication Log Queue: To achieve data replication, I've created a queue called log_queue. This queue stores replication log entries generated by the master server. When changes are made to the data store (e.g., a SET command), the log entry is added to the queue.

log_queue = queue.Queue()

4. Replication Thread: A dedicated thread, named replication-thread for the master server and worker-replication-thread for the worker server, is responsible for managing the replication log.

Master Server: When the server is running as a master, the run_replication_log function is executed. This function constantly checks the log queue for new log entries. When a new entry is available, it's sent to connected worker servers.

replication_thread = threading.Thread(target=self.run_replication_log, name='replication-thread')
replication_thread.start()

The run_replication_log function runs on the master server. It continually checks a log queue for new entries. These log entries represent commands that have modified the data in the master server, e.g., "SET key value." When a new log entry is added to the queue, it's sent to all connected worker servers. The master informs the workers about data changes through these log entries.

Worker Server: The worker server, on the other hand, connects to the master server and initiates the replicate_data_from_master function in a separate thread. This function is responsible for receiving replication log data from the master and applying it to the local data store.

    def replicate_data_from_master(self):
        logging.debug("replicate_data_from_master called")
        while True:
            try:
                data = self.worker_socket.recv(1024).decode('utf-8')
                if data:
                    logging.debug(f"Received data from master: {data}")
                    # Process the received data and apply replication
                    self.apply_replication_data(data)
                else:
                    logging.debug("No data received from master")
            except ConnectionResetError:
                logging.error("Connection with master reset. Reconnecting...")

            except Exception as e:
                logging.debug(f"Worker socket - {self.worker_socket}" )
                logging.error(f"Error replicating data from master: {e}") # Stop replication thread if an error occurs
                return

Applying Replication Data: In the apply_replication_data function, I parse and execute the replication log entries received by the worker. This involves checking the command (e.g., SET, DEL) and applying it to the local data store.

def apply_replication_data(self, data):
    parts = data.strip().split()
    command = parts[0].upper()

    if command == "SET":
        self.handle_set(None, parts)
    # Handle other replication commands as needed

Telling master to let us connect and register

self.worker_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.worker_socket.connect((master_host, master_port))
self.worker_socket.send(b"REGISTER")
replication_thread = threading.Thread(target=self.replicate_data_from_master, name='worker-replication-thread')
replication_thread.start()

We register the Worker:

    def handle_worker_registration(self, client_socket,client_address):
        # Handle worker registration here
        logging.debug("handle_worker_registration called")
        if self.is_master:
            # Add the connected worker socket to the list
            self.connected_workers.append(client_socket)
            logging.debug(f"Worker registered from {client_address[0]}:{client_address[1]}")
            logging.debug(f"Number of connected workers: {len(self.connected_workers)}")
            logging.debug(f"Connected workers: {self.connected_workers}")

5. Replication Log Entry: A replication log entry is a string that represents a Redis command, such as "SET key value." The master server generates these entries when data changes and pushes them into the log_queue.

log_entry = f"SET {key} {value}"
self.log_queue.put(log_entry)

6. Replication Data Transmission: In the send_replication_log function, I loop through connected worker servers and send them the replication log entries. The worker server receives these entries through its replicate_data_from_master function and processes them.

send_replication_log(self, log_entry):
    for worker_socketin self.connected_workers:
        worker_socket.send(log_entry.encode('utf-8'))

8. Ensuring Consistency: By following this replication lifecycle, data changes made in the master server are transmitted to worker servers, ensuring data consistency across the cluster. This is a fundamental mechanism for high availability and disaster recovery in distributed systems.

I can write more and more, but I think if you read the code and understand the flow, it will be faster, it's not that complex.

| Master Server (is_master=True)  | Worker Server (is_master=False) |
|---------------------------------|---------------------------------|
| AOF mode enabled                | AOF mode enabled                |
| Main server loop starts         | Connects to master              |
| Replication log thread starts   | Replication thread starts       |
|   - Queue log entries           |   - Receive replication data    |
|   - Send log entries to workers |   - Apply replication data       |
| Handle client requests          | Handle client requests          |
|   - Update data store           |   - Update data store           |
|   - Append to AOF log          |   - Append to AOF log          |
| Transaction handling            | Transaction handling            |
| Replication log thread          | Replication thread              |

clap | Bocchi The Rock! | Know Your Meme

Issues I faced this time :

Previous things were straightforward, and this was a spiral mess.

1st: so you can set data that can be used by normal clients or workers to replicate data on their side, now this is a tricky thing, cause both send socket commands to the master. During replication, you just set and don't send anything back as a response, but I wrote code such that when you set data you send back a msg via socket to the client, and this would mess up with the workers working cause the worker's replication function was not made to receive and handle a send ok msg to the client back.

so what would happen is, I set data to master, it sends it to the worker, and workerwould just close the connection cause, it would try to send a socket response back, which the replication function is not made to handle, and it would just close the socket.

when I explain the issue now, it is very clear what the issue was, but when I was testing it, in that zone, it took me too long to figure out why I could send one piece of data and it got replicated and then I couldn't send more data.

Even after I found that the connection was closed, I didn't know why and what was closing it, cause sockets are usually always open.

anime girl

2nd : uhh I was listening for worker requests at the start of the program using one socket where It was receiving requests and seeing if the msg is "REGISTER", to see if a woker is registering.

it was something like message_bytes = client_socket.recv(8), so you see REGISTER is 8 bit so it would handle it accurately.

but you see, since it's at the top, it also would receive the commands from the client like SET KEY VALUE, but these commands are more bits than 8 right, so the first time I would just get an error, and I couldn't figure out why I am not able to use SET KEY VALUE, previously it worked lol.

and there is another client_socket that receives 1024 bits of data, this is what used to handle my set, get etc commands, but since I put that
message_bytes = client_socket.recv(8)
at the top, it would try to receive these commands also and I would get an error.

stupid brain, he should have thought of this, caused too much pain in my empty head

Sad Anime Hinata Hoshino GIF | GIFDB.com

Solution :

    def handle_client(self, client_socket,client_address):
        try:
            while True:
                request = client_socket.recv(1024).decode('utf-8')
                if not request:
                    break
                if self.is_master and request == "REGISTER":
                    self.handle_worker_registration(client_socket,client_address)
                parts = request.strip().split()
                command = parts[0].upper()

                if command == "SET":
                    self.handle_set(client_socket, parts)              
                elif command == "GET":
                    self.handle_get(client_socket, parts)

    def handle_worker_registration(self, client_socket,client_address):
        # Handle Worker registration here
        logging.debug("handle_worker_registration called")
        if self.is_master:
            # Add the connected worker socket to the list
            self.connected_workers.append(client_socket)

so we receive all requests in one place and then differentiate what to do based on the msg, the client will send a msg so will the worker, and based on that msg we call appropriate functions.

anime #sempai #animegirl #heart - Kawaii Cute Anime Girl, HD Png Download -  kindpng


Current Limitations:

1: Data inconsistency and synchoronization issues:

When running the server in worker mode, it not only replicates data from the master but can also act as an independent server where you can write data. This can lead to data inconsistency if you write data to the worker or multiple workers, as the workers may contain data that the master does not possess or have knowledge of.

This is a common issue in distributed computing.

One potential solution is to restrict the worker servers to only receive data and disable their write operations.
Or I can transform each server into a peer-to-peer network where everyone shares data; I've heard that there are specific protocols and algorithms designed for this purpose.

1
Subscribe to my newsletter

Read articles from Biohacker0 directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Biohacker0
Biohacker0

I am a software engineer and a bioinformatics researcher. I find joy in learning how things work and diving into rabbit holes. JavaScript + python + pdf's and some good music is all I need to get things done. Apart from Bio and software , I am deeply into applied physics. Waves, RNA, Viruses, drug design , Lithography are something I will get deep into in next 2 years. I will hack biology one day