Implementing Event Loops in Go: A Practical Approach

Aniket MahangareAniket Mahangare
15 min read

Ever wondered how single threaded applications, like Redis, are able to handle thousands of clients concurrently (“perceived” concurrency)? The answer is “Event Loops“. In this article, we will dive deep into how event loops work & their implementation in GoLang.

Event Loops

An event loop is a system that continuously listens for events (like user actions or messages) and handles each one sequentially. This allows programs to manage multiple tasks smoothly and efficiently using just a single thread.

Concurrency Models for Server Architecture

There are two basic concurrency models for server architecture:

  1. Thread-Per-Request: This model uses a separate thread to handle each incoming client request. When a new request arrives, the server creates a new thread (or utilizes one from a thread pool) to process it independently.

  2. I/O Multiplexing: This model allows a single thread (or a limited number of threads) to monitor and manage multiple I/O streams (like network sockets, files, or pipes) simultaneously. Instead of dedicating a thread to each request, the server uses mechanisms to detect when I/O operations (like reading or writing data) are ready to be performed. This thread then takes actions as per events on these streams.

The key challenge of Thread-Per-Request model is, the application needs to be thread safe, which requires addition of locking mechanism, which in turn increases the code complexity & slows down the execution, as multiple threads can compete to acquire the lock for a critical section.

Single threaded programs don’t need to handle thread safety, hence the CPU time allocated to them, can be utilized more efficiently. Single threaded applications usually rely on I/O multiplexing to implement event loops, so that they can serve clients concurrently.

Key Concepts

User Space vs. Kernel Space

  • User Space: User space is the environment where user-facing applications run. This includes applications such as web servers, Chrome, text editors, and command utilities. User space applications cannot directly access the system’s hardware resources. They must make system calls to the kernel to request access to these resources.

  • Kernel Space: Kernel space is where the core of the operating system, the kernel, operates. The kernel is responsible for managing the system’s resources, such as the CPU, memory, and storage. It also provides system calls, which are interfaces that allow user space applications to interact with the kernel. The kernel has unrestricted access to the system’s hardware resources. This is necessary for the kernel to perform its essential tasks, such as scheduling processes, managing memory, and handling interrupts.

    💡
    You can read more about the User-space, Kernel-space, and System Calls here.

Kernel Buffers

  • Receive Buffer: When data arrives from a network or other I/O source, it's stored in a kernel-managed buffer until the application reads it.

  • Send Buffer: Data that an application wants to send is stored in a kernel buffer before being transmitted over the network or I/O device.

File Descriptors (FDs)

  • Definition: Integers that uniquely identify an open file, socket, or other I/O resource within the operating system.

  • Usage: Applications use FDs to perform read/write operations on these resources.

💡
I highly recommend you watch these YouTube videos on File Descriptors & System Calls in Linux. TL/DR, everything in unix/linux is a file & the OS provides system calls to interact with resources.

I/O Multiplexing Mechanisms

kqueue(on MacOS) and epoll(on Linux) are kernel system calls for scalable I/O event notification mechanisms in an efficient manner. In simple words, you subscribe to certain kernel events and you get notified when any of those events occur. These system calls are desigend for scalable situations such as a webserver where thousands of concurrent connections are being handled by one server.

In this article, I will focus on using kqueue, however, I will share the GitHub repo with code for implementation using epoll.

💡
You can read more on these system calls here.

Implementation of I/O Multiplexing in GoLang

In Go, we can use the golang.org/x/sys/unix package to access low-level system calls like kqueue on Unix-like systems.

Step 1: Define Server Configuration

Create a configuration struct or use variables to hold server parameters.

var (
    host       = "127.0.0.1" // Server IP address
    port       = 8080        // Server port
    maxClients = 20000       // Maximum number of concurrent clients
)

Step 2: Create the Server Socket

A new socket can be created using unix.Socket method. A socket can be thought of as an endpoint in a two-way communication channel. Socket routines create the communication channel, and the channel is used to send data between application programs either locally or over networks. Each socket within the network has a unique name associated with it called a socket descriptor—a full-word integer that designates a socket and allows application programs to refer to it when needed.

In simpler terms, a socket is like a door through which data enters and exits a program over the network. It enables inter-process communication, either on the same machine or across different machines connected via a network. Each of these sockets is assigned a file descriptor when they are created.

serverFD, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, unix.IPPROTO_TCP)
if err != nil {
    return fmt.Errorf("socket creation failed: %v", err)
}
defer unix.Close(serverFD)
  • unix.AF_INET: This option specifies that the socket will use IPv4 Internet protocol.

  • unix.SOCK_STREAM: This option provides reliable, ordered, and error-checked delivery of a stream of bytes, typically using TCP.

  • unix.IPPROTO_TCP: This option specifies that the socket will use the TCP protocol for communication, ensuring reliable data transmission.

💡
Check this documentation by IBM to read more on sockets.

Step 3: Set Socket Options

Set Non-blocking Mode

Setting a socket to non-blocking mode ensures that I/O operations return immediately without waiting. When a socket operates in this mode:

  • accept: If there are no incoming connections, it immediately returns an error (EAGAIN or EWOULDBLOCK) instead of waiting.

  • read/recv: If there's no data to read, it immediately returns an error instead of blocking.

  • write/send: If the socket's buffer is full and can't accept more data, it immediately returns an error instead of waiting.

if err := unix.SetNonblock(serverFD, true); err != nil {
    return fmt.Errorf("failed to set non-blocking mode: %v", err)
}

Allow Address Reuse

This is particularly useful in scenarios where you need to restart a server quickly without waiting for the operating system to release the port.

if err := unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
    return fmt.Errorf("failed to set SO_REUSEADDR: %v", err)
}

Step 4: Bind and Listen

Bind the Socket

Socket binding involves linking a socket to a specific local address and port on your computer. Essentially, it tells the operating system, "Hey, my application is ready to handle any network traffic that comes to this address and port."

When you're setting up a server in network programming, binding is a crucial first step. Before your server can start accepting connections or receiving data, it needs to bind its socket to a chosen address and port. This connection point is where clients will reach out to connect or send information.

addr := &unix.SockaddrInet4{Port: port}
copy(addr.Addr[:], net.ParseIP(host).To4())

if err := unix.Bind(serverFD, addr); err != nil {
    return fmt.Errorf("failed to bind socket: %v", err)
}

unix.SockaddrInet4: This struct hold the IP address/host address (IPv4) & the port of your server.

Start Listening

if err := unix.Listen(serverFD, maxClients); err != nil {
    return fmt.Errorf("failed to listen on socket: %v", err)
}

Step 5: Initialize kqueue

kq, err := unix.Kqueue()
if err != nil {
    return fmt.Errorf("failed to create kqueue: %v", err)
}
defer unix.Close(kq)
  • unix.Kqueue(): Creates a new kernel event queue and returns a file descriptor associated with this kqueue.

Step 6: Register Server FD with kqueue

Register the file descriptor associated with server socket to monitor for incoming connections. Just to reiterate, everything linux/unix is a file. Basically, when clients want to establish connection with our server, kqueue monitors these event & notify our application to take actions accordingly.

kev := unix.Kevent_t{
    Ident:  uint64(serverFD),
    Filter: unix.EVFILT_READ,
    Flags:  unix.EV_ADD,
}

if _, err := unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil); err != nil {
    return fmt.Errorf("failed to register server FD with kqueue: %v", err)
}
  • Ident: The identifier (file descriptor) to watch, in this case we want to watch the file descriptor associated with our server.

  • Filter: The type of event to watch (unix.EVFILT_READ for read events).

  • Flags: Actions to perform (unix.EV_ADD to add the event).

The Kevent method here is used to perform certain actions on the kerned event queue, we created before. It accepts following parameters:

  • The file descriptor associated with kqueue

  • A slice of Kevent_t structs. This slice tells kqueue what changes you want to make. Here, you're adding a new event (like monitoring a socket for incoming connections).

  • An event list, this would be a slice where kqueue writes back any events that have occurred. We will see this in the next section.

  • A timeout, that defines how long kevent should wait for events.

Step 7: Enter the Event Loop

Create a loop to wait for events and handle them.

events := make([]unix.Kevent_t, maxClients)

for {
    nevents, err := unix.Kevent(kq, nil, events, nil)
    if err != nil {
        if err == unix.EINTR {
            continue // Interrupted system call, retry
        }
        return fmt.Errorf("kevent error: %v", err)
    }

    for i := 0; i < nevents; i++ {
        ev := events[i]
        fd := int(ev.Ident)

        if fd == serverFD {
            // Handle new incoming connection
        } else {
            // Handle client I/O
        }
    }
}
  • unix.Kevent: This is a blocking call which waits for events until getting timed out (optional). This method is used to wait for events from kqueue as well alter events to be monitored by the kqueue.

Step 8: Accept New Connections

As we are monitoring the file descriptor associated with the server socket, kqueue returns events such as new client connection request. When the server socket is ready & clients request to connect with the server on the server socket, then accept connections from client.

nfd, sa, err := unix.Accept(serverFD)
if err != nil {
    log.Printf("failed to accept connection: %v", err)
    continue
}
defer unix.Close(nfd) // Ensure the FD is closed when no longer needed

// Set the new socket to non-blocking mode
if err := unix.SetNonblock(nfd, true); err != nil {
    log.Printf("failed to set non-blocking mode on client FD: %v", err)
    unix.Close(nfd)
    continue
}

// Register the new client FD with kqueue
clientKev := unix.Kevent_t{
    Ident:  uint64(nfd),
    Filter: unix.EVFILT_READ,
    Flags:  unix.EV_ADD,
}

if _, err := unix.Kevent(kq, []unix.Kevent_t{clientKev}, nil, nil); err != nil {
    log.Printf("failed to register client FD with kqueue: %v", err)
    unix.Close(nfd)
    continue
}

log.Printf("accepted new connection from %v", sa)
  • unix.Accept: Method to accept new incoming connection from clients.

  • nfd: When server accepts a new connection, it creates a new socket for that client. This nfd is file descriptor associated with that client socket.

  • sa: This is address of the socket the client is connected to.

  • Register the client FD: When server accepts connection from client, we register the file descriptor associated the client socket in kqueue, so that we can monitor events from the client such as new data sent, connection terminated, etc.

Step 9: Handle Client I/O

When clients send some data to our server, the kqueue notifies our application, then we can take actions accordingly.

buf := make([]byte, 1024)
n, err := unix.Read(fd, buf)
if err != nil {
    if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
        // No data available right now
        continue
    }
    log.Printf("failed to read from client FD %d: %v", fd, err)
    // Remove the FD from kqueue and close it
    kev := unix.Kevent_t{
        Ident:  uint64(fd),
        Filter: unix.EVFILT_READ,
        Flags:  unix.EV_DELETE,
    }
    unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil)
    unix.Close(fd)
    continue
}

// Process the data received
data := buf[:n]
log.Printf("received data from client FD %d: %s", fd, string(data))

// Echo the data back to the client (optional)
if _, err := unix.Write(fd, data); err != nil {
    log.Printf("failed to write to client FD %d: %v", fd, err)
    // Handle write error if necessary
}
  • unix.Read: Reads data from the file descriptor associated with socket corresponding to the client.

  • Handling errors: EAGAIN or EWOULDBLOCK: No data available; in non-blocking mode, this is normal. You may think that, as your application is single threaded, if kqueue is saying there is some data to be read & when you attempt to read, then there has to be some data. But in certain cases, it’s possible that, it’s not the case, hence it’s recommended to handle these errors explicitly. You can read about it in Beej's Guide to Network Programming, I am adding a quote about this from this book.

Quick note to all you Linux fans out there: sometimes, in rare circumstances, Linux’s select() can return “ready-to-read” and then not actually be ready to read! This means it will block on the read() after the select() says it won’t! Why you little—! Anyway, the workaround solution is to set the O_NONBLOCK flag on the receiving socket so it errors with EWOULDBLOCK (which you can just safely ignore if it occurs).

  • Other errors: Close the connection.

Step 10: Clean Up Resources

Ensure that all file descriptors are properly closed when they are no longer needed.

  • Closing Client FDs: As shown in previous steps, remove the FD from kqueue and close it.

  • Closing Server FD and kqueue FD: Use defer statements to ensure they are closed when the function exits.

defer unix.Close(serverFD)
defer unix.Close(kq)

Complete Refactored Code

Here's the complete code, you can also find it in the GitHub repo I mentioned below.

package main

import (
    "fmt"
    "golang.org/x/sys/unix"
    "log"
    "net"
)

var (
    host       = "127.0.0.1" // Server IP address
    port       = 8080        // Server port
    maxClients = 20000       // Maximum number of concurrent clients
)

func RunAsyncTCPServerUnix() error {
    log.Printf("starting an asynchronous TCP server on %s:%d", host, port)

    // Create kqueue event objects to hold events
    events := make([]unix.Kevent_t, maxClients)

    // Create a socket
    serverFD, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, unix.IPPROTO_TCP)
    if err != nil {
        return fmt.Errorf("socket creation failed: %v", err)
    }
    defer unix.Close(serverFD)

    // Set the socket to non-blocking mode
    if err := unix.SetNonblock(serverFD, true); err != nil {
        return fmt.Errorf("failed to set non-blocking mode: %v", err)
    }

    // Allow address reuse
    if err := unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
        return fmt.Errorf("failed to set SO_REUSEADDR: %v", err)
    }

    // Bind the IP & the port
    addr := &unix.SockaddrInet4{Port: port}
    copy(addr.Addr[:], net.ParseIP(host).To4())
    if err := unix.Bind(serverFD, addr); err != nil {
        return fmt.Errorf("failed to bind socket: %v", err)
    }

    // Start listening
    if err := unix.Listen(serverFD, maxClients); err != nil {
        return fmt.Errorf("failed to listen on socket: %v", err)
    }

    // Create kqueue instance
    kq, err := unix.Kqueue()
    if err != nil {
        return fmt.Errorf("failed to create kqueue: %v", err)
    }
    defer unix.Close(kq)

    // Register the serverFD with kqueue
    kev := unix.Kevent_t{
        Ident:  uint64(serverFD),
        Filter: unix.EVFILT_READ,
        Flags:  unix.EV_ADD,
    }

    if _, err := unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil); err != nil {
        return fmt.Errorf("failed to register server FD with kqueue: %v", err)
    }

    // Event loop
    for {
        nevents, err := unix.Kevent(kq, nil, events, nil)
        if err != nil {
            if err == unix.EINTR {
                continue // Interrupted system call, retry
            }
            return fmt.Errorf("kevent error: %v", err)
        }

        for i := 0; i < nevents; i++ {
            ev := events[i]
            fd := int(ev.Ident)

            if fd == serverFD {
                // Accept the incoming connection from client
                nfd, sa, err := unix.Accept(serverFD)
                if err != nil {
                    log.Printf("failed to accept connection: %v", err)
                    continue
                }

                // Set the new socket to non-blocking mode
                if err := unix.SetNonblock(nfd, true); err != nil {
                    log.Printf("failed to set non-blocking mode on client FD: %v", err)
                    unix.Close(nfd)
                    continue
                }

                // Register the new client FD with kqueue
                clientKev := unix.Kevent_t{
                    Ident:  uint64(nfd),
                    Filter: unix.EVFILT_READ,
                    Flags:  unix.EV_ADD,
                }

                if _, err := unix.Kevent(kq, []unix.Kevent_t{clientKev}, nil, nil); err != nil {
                    log.Printf("failed to register client FD with kqueue: %v", err)
                    unix.Close(nfd)
                    continue
                }

                log.Printf("accepted new connection from %v", sa)
            } else {
                // Handle client I/O
                buf := make([]byte, 1024)
                n, err := unix.Read(fd, buf)
                if err != nil {
                    if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
                        continue // No data available right now
                    }
                    log.Printf("failed to read from client FD %d: %v", fd, err)
                    // Remove the FD from kqueue and close it
                    kev := unix.Kevent_t{
                        Ident:  uint64(fd),
                        Filter: unix.EVFILT_READ,
                        Flags:  unix.EV_DELETE,
                    }
                    unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil)
                    unix.Close(fd)
                    continue
                }

                if n == 0 {
                    // Connection closed by client
                    log.Printf("client FD %d closed the connection", fd)
                    kev := unix.Kevent_t{
                        Ident:  uint64(fd),
                        Filter: unix.EVFILT_READ,
                        Flags:  unix.EV_DELETE,
                    }
                    unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil)
                    unix.Close(fd)
                    continue
                }

                // Process the data received
                data := buf[:n]
                log.Printf("received data from client FD %d: %s", fd, string(data))

                // Echo the data back to the client (optional)
                if _, err := unix.Write(fd, data); err != nil {
                    log.Printf("failed to write to client FD %d: %v", fd, err)
                    // Handle write error if necessary
                }
            }
        }
    }
}

func main() {
    if err := RunAsyncTCPServerUnix(); err != nil {
        log.Fatalf("server error: %v", err)
    }
}

How to test above code?

netcat

netcat is a computer networking utility for reading from and writing to network connections using TCP or UDP.

  1. Open up two or more terminal windows.

  2. Type nc localhost 8080

    💡
    You can refer to this article to learn more on netcat.

    Go Client Code

     package main
    
     import (
         "bufio"
         "fmt"
         "log"
         "net"
         "sync"
         "time"
     )
    
     const (
         serverAddress = "127.0.0.1:8080" // Server address
         numClients    = 100              // Number of concurrent clients to simulate
     )
    
     func main() {
         var wg sync.WaitGroup
    
         for i := 0; i < numClients; i++ {
             wg.Add(1)
             go func(clientID int) {
                 defer wg.Done()
                 err := runClient(clientID)
                 if err != nil {
                     log.Printf("Client %d error: %v", clientID, err)
                 }
             }(i)
             // Optional: Sleep to stagger client connections
             time.Sleep(100 * time.Millisecond)
         }
    
         wg.Wait()
     }
    
     func runClient(clientID int) error {
         // Connect to the server
         conn, err := net.Dial("tcp", serverAddress)
         if err != nil {
             return fmt.Errorf("failed to connect: %v", err)
         }
         defer conn.Close()
    
         log.Printf("Client %d connected to %s", clientID, serverAddress)
    
         // Send a message to the server
         message := fmt.Sprintf("Hello from client %d", clientID)
         _, err = fmt.Fprintf(conn, message+"\n")
         if err != nil {
             return fmt.Errorf("failed to send data: %v", err)
         }
    
         time.Sleep(100 * time.Millisecond)
    
         // Receive a response from the server
         reply, err := bufio.NewReader(conn).ReadString('\n')
         if err != nil {
             return fmt.Errorf("failed to read response: %v", err)
         }
    
         log.Printf("Client %d received: %s", clientID, reply)
    
         return nil
     }
    

So that’s it for this one. Hope you liked this article! If you have questions/comments, then please feel free to comment on this article.

You can find the implementation for kqueue, epoll & the Go client in this GitHub repository. Stay tuned for next one!

Disclaimer: The opinions expressed here are my own and do not represent the views of my employer. This blog is intended for informational purposes only.

0
Subscribe to my newsletter

Read articles from Aniket Mahangare directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Aniket Mahangare
Aniket Mahangare

I am a Software Engineer in the Platform team at Uber India, deeply passionate about System Architecture, Database Internals, and Advanced Algorithms. I thrive on diving into intricate engineering details and bringing complex solutions to life. In my leisure time, I enjoy reading insightful articles, experimenting with new ideas, and sharing my knowledge through writing. This blog is a space where I document my learning journey, project experiences, and technical insights. Thank you for visiting—I hope you enjoy my posts!