Implementing Event Loops in Go: A Practical Approach
Ever wondered how single threaded applications, like Redis, are able to handle thousands of clients concurrently (“perceived” concurrency)? The answer is “Event Loops“. In this article, we will dive deep into how event loops work & their implementation in GoLang.
Event Loops
An event loop is a system that continuously listens for events (like user actions or messages) and handles each one sequentially. This allows programs to manage multiple tasks smoothly and efficiently using just a single thread.
Concurrency Models for Server Architecture
There are two basic concurrency models for server architecture:
Thread-Per-Request: This model uses a separate thread to handle each incoming client request. When a new request arrives, the server creates a new thread (or utilizes one from a thread pool) to process it independently.
I/O Multiplexing: This model allows a single thread (or a limited number of threads) to monitor and manage multiple I/O streams (like network sockets, files, or pipes) simultaneously. Instead of dedicating a thread to each request, the server uses mechanisms to detect when I/O operations (like reading or writing data) are ready to be performed. This thread then takes actions as per events on these streams.
The key challenge of Thread-Per-Request model is, the application needs to be thread safe, which requires addition of locking mechanism, which in turn increases the code complexity & slows down the execution, as multiple threads can compete to acquire the lock for a critical section.
Single threaded programs don’t need to handle thread safety, hence the CPU time allocated to them, can be utilized more efficiently. Single threaded applications usually rely on I/O multiplexing to implement event loops, so that they can serve clients concurrently.
Key Concepts
User Space vs. Kernel Space
User Space: User space is the environment where user-facing applications run. This includes applications such as web servers, Chrome, text editors, and command utilities. User space applications cannot directly access the system’s hardware resources. They must make system calls to the kernel to request access to these resources.
Kernel Space: Kernel space is where the core of the operating system, the kernel, operates. The kernel is responsible for managing the system’s resources, such as the CPU, memory, and storage. It also provides system calls, which are interfaces that allow user space applications to interact with the kernel. The kernel has unrestricted access to the system’s hardware resources. This is necessary for the kernel to perform its essential tasks, such as scheduling processes, managing memory, and handling interrupts.
💡You can read more about the User-space, Kernel-space, and System Calls here.
Kernel Buffers
Receive Buffer: When data arrives from a network or other I/O source, it's stored in a kernel-managed buffer until the application reads it.
Send Buffer: Data that an application wants to send is stored in a kernel buffer before being transmitted over the network or I/O device.
File Descriptors (FDs)
Definition: Integers that uniquely identify an open file, socket, or other I/O resource within the operating system.
Usage: Applications use FDs to perform read/write operations on these resources.
I/O Multiplexing Mechanisms
kqueue(on MacOS) and epoll(on Linux) are kernel system calls for scalable I/O event notification mechanisms in an efficient manner. In simple words, you subscribe to certain kernel events and you get notified when any of those events occur. These system calls are desigend for scalable situations such as a webserver where thousands of concurrent connections are being handled by one server.
In this article, I will focus on using kqueue
, however, I will share the GitHub repo with code for implementation using epoll
.
Implementation of I/O Multiplexing in GoLang
In Go, we can use the golang.org/x/sys/unix
package to access low-level system calls like kqueue
on Unix-like systems.
Step 1: Define Server Configuration
Create a configuration struct or use variables to hold server parameters.
var (
host = "127.0.0.1" // Server IP address
port = 8080 // Server port
maxClients = 20000 // Maximum number of concurrent clients
)
Step 2: Create the Server Socket
A new socket can be created using unix.Socket
method. A socket can be thought of as an endpoint in a two-way communication channel. Socket routines create the communication channel, and the channel is used to send data between application programs either locally or over networks. Each socket within the network has a unique name associated with it called a socket descriptor—a full-word integer that designates a socket and allows application programs to refer to it when needed.
In simpler terms, a socket is like a door through which data enters and exits a program over the network. It enables inter-process communication, either on the same machine or across different machines connected via a network. Each of these sockets is assigned a file descriptor when they are created.
serverFD, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, unix.IPPROTO_TCP)
if err != nil {
return fmt.Errorf("socket creation failed: %v", err)
}
defer unix.Close(serverFD)
unix.AF_INET: This option specifies that the socket will use IPv4 Internet protocol.
unix.SOCK_STREAM: This option provides reliable, ordered, and error-checked delivery of a stream of bytes, typically using TCP.
unix.IPPROTO_TCP: This option specifies that the socket will use the TCP protocol for communication, ensuring reliable data transmission.
Step 3: Set Socket Options
Set Non-blocking Mode
Setting a socket to non-blocking mode ensures that I/O operations return immediately without waiting. When a socket operates in this mode:
accept
: If there are no incoming connections, it immediately returns an error (EAGAIN
orEWOULDBLOCK
) instead of waiting.read
/recv
: If there's no data to read, it immediately returns an error instead of blocking.write
/send
: If the socket's buffer is full and can't accept more data, it immediately returns an error instead of waiting.
if err := unix.SetNonblock(serverFD, true); err != nil {
return fmt.Errorf("failed to set non-blocking mode: %v", err)
}
Allow Address Reuse
This is particularly useful in scenarios where you need to restart a server quickly without waiting for the operating system to release the port.
if err := unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
return fmt.Errorf("failed to set SO_REUSEADDR: %v", err)
}
Step 4: Bind and Listen
Bind the Socket
Socket binding involves linking a socket to a specific local address and port on your computer. Essentially, it tells the operating system, "Hey, my application is ready to handle any network traffic that comes to this address and port."
When you're setting up a server in network programming, binding is a crucial first step. Before your server can start accepting connections or receiving data, it needs to bind its socket to a chosen address and port. This connection point is where clients will reach out to connect or send information.
addr := &unix.SockaddrInet4{Port: port}
copy(addr.Addr[:], net.ParseIP(host).To4())
if err := unix.Bind(serverFD, addr); err != nil {
return fmt.Errorf("failed to bind socket: %v", err)
}
unix.SockaddrInet4: This struct
hold the IP address/host address (IPv4) & the port of your server.
Start Listening
if err := unix.Listen(serverFD, maxClients); err != nil {
return fmt.Errorf("failed to listen on socket: %v", err)
}
Step 5: Initialize kqueue
kq, err := unix.Kqueue()
if err != nil {
return fmt.Errorf("failed to create kqueue: %v", err)
}
defer unix.Close(kq)
- unix.Kqueue(): Creates a new kernel event queue and returns a file descriptor associated with this
kqueue
.
Step 6: Register Server FD with kqueue
Register the file descriptor associated with server socket to monitor for incoming connections. Just to reiterate, everything linux/unix is a file. Basically, when clients want to establish connection with our server, kqueue
monitors these event & notify our application to take actions accordingly.
kev := unix.Kevent_t{
Ident: uint64(serverFD),
Filter: unix.EVFILT_READ,
Flags: unix.EV_ADD,
}
if _, err := unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil); err != nil {
return fmt.Errorf("failed to register server FD with kqueue: %v", err)
}
Ident: The identifier (file descriptor) to watch, in this case we want to watch the file descriptor associated with our server.
Filter: The type of event to watch (
unix.EVFILT_READ
for read events).Flags: Actions to perform (
unix.EV_ADD
to add the event).
The Kevent
method here is used to perform certain actions on the kerned event queue, we created before. It accepts following parameters:
The file descriptor associated with kqueue
A slice of
Kevent_t
structs. This slice tells kqueue what changes you want to make. Here, you're adding a new event (like monitoring a socket for incoming connections).An event list, this would be a slice where kqueue writes back any events that have occurred. We will see this in the next section.
A timeout, that defines how long
kevent
should wait for events.
Step 7: Enter the Event Loop
Create a loop to wait for events and handle them.
events := make([]unix.Kevent_t, maxClients)
for {
nevents, err := unix.Kevent(kq, nil, events, nil)
if err != nil {
if err == unix.EINTR {
continue // Interrupted system call, retry
}
return fmt.Errorf("kevent error: %v", err)
}
for i := 0; i < nevents; i++ {
ev := events[i]
fd := int(ev.Ident)
if fd == serverFD {
// Handle new incoming connection
} else {
// Handle client I/O
}
}
}
- unix.Kevent: This is a blocking call which waits for events until getting timed out (optional). This method is used to wait for events from
kqueue
as well alter events to be monitored by thekqueue
.
Step 8: Accept New Connections
As we are monitoring the file descriptor associated with the server socket, kqueue
returns events such as new client connection request. When the server socket is ready & clients request to connect with the server on the server socket, then accept connections from client.
nfd, sa, err := unix.Accept(serverFD)
if err != nil {
log.Printf("failed to accept connection: %v", err)
continue
}
defer unix.Close(nfd) // Ensure the FD is closed when no longer needed
// Set the new socket to non-blocking mode
if err := unix.SetNonblock(nfd, true); err != nil {
log.Printf("failed to set non-blocking mode on client FD: %v", err)
unix.Close(nfd)
continue
}
// Register the new client FD with kqueue
clientKev := unix.Kevent_t{
Ident: uint64(nfd),
Filter: unix.EVFILT_READ,
Flags: unix.EV_ADD,
}
if _, err := unix.Kevent(kq, []unix.Kevent_t{clientKev}, nil, nil); err != nil {
log.Printf("failed to register client FD with kqueue: %v", err)
unix.Close(nfd)
continue
}
log.Printf("accepted new connection from %v", sa)
unix.Accept: Method to accept new incoming connection from clients.
nfd: When server accepts a new connection, it creates a new socket for that client. This
nfd
is file descriptor associated with that client socket.sa: This is address of the socket the client is connected to.
Register the client FD: When server accepts connection from client, we register the file descriptor associated the client socket in
kqueue
, so that we can monitor events from the client such asnew data sent
,connection terminated
, etc.
Step 9: Handle Client I/O
When clients send some data to our server, the kqueue
notifies our application, then we can take actions accordingly.
buf := make([]byte, 1024)
n, err := unix.Read(fd, buf)
if err != nil {
if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
// No data available right now
continue
}
log.Printf("failed to read from client FD %d: %v", fd, err)
// Remove the FD from kqueue and close it
kev := unix.Kevent_t{
Ident: uint64(fd),
Filter: unix.EVFILT_READ,
Flags: unix.EV_DELETE,
}
unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil)
unix.Close(fd)
continue
}
// Process the data received
data := buf[:n]
log.Printf("received data from client FD %d: %s", fd, string(data))
// Echo the data back to the client (optional)
if _, err := unix.Write(fd, data); err != nil {
log.Printf("failed to write to client FD %d: %v", fd, err)
// Handle write error if necessary
}
unix.Read: Reads data from the file descriptor associated with socket corresponding to the client.
Handling errors: EAGAIN or EWOULDBLOCK: No data available; in non-blocking mode, this is normal. You may think that, as your application is single threaded, if
kqueue
is saying there is some data to be read & when you attempt to read, then there has to be some data. But in certain cases, it’s possible that, it’s not the case, hence it’s recommended to handle these errors explicitly. You can read about it in Beej's Guide to Network Programming, I am adding a quote about this from this book.
Quick note to all you Linux fans out there: sometimes, in rare circumstances, Linux’s
select()
can return “ready-to-read” and then not actually be ready to read! This means it will block on theread()
after theselect()
says it won’t! Why you little—! Anyway, the workaround solution is to set theO_NONBLOCK
flag on the receiving socket so it errors withEWOULDBLOCK
(which you can just safely ignore if it occurs).
- Other errors: Close the connection.
Step 10: Clean Up Resources
Ensure that all file descriptors are properly closed when they are no longer needed.
Closing Client FDs: As shown in previous steps, remove the FD from kqueue and close it.
Closing Server FD and kqueue FD: Use
defer
statements to ensure they are closed when the function exits.
defer unix.Close(serverFD)
defer unix.Close(kq)
Complete Refactored Code
Here's the complete code, you can also find it in the GitHub repo I mentioned below.
package main
import (
"fmt"
"golang.org/x/sys/unix"
"log"
"net"
)
var (
host = "127.0.0.1" // Server IP address
port = 8080 // Server port
maxClients = 20000 // Maximum number of concurrent clients
)
func RunAsyncTCPServerUnix() error {
log.Printf("starting an asynchronous TCP server on %s:%d", host, port)
// Create kqueue event objects to hold events
events := make([]unix.Kevent_t, maxClients)
// Create a socket
serverFD, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, unix.IPPROTO_TCP)
if err != nil {
return fmt.Errorf("socket creation failed: %v", err)
}
defer unix.Close(serverFD)
// Set the socket to non-blocking mode
if err := unix.SetNonblock(serverFD, true); err != nil {
return fmt.Errorf("failed to set non-blocking mode: %v", err)
}
// Allow address reuse
if err := unix.SetsockoptInt(serverFD, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
return fmt.Errorf("failed to set SO_REUSEADDR: %v", err)
}
// Bind the IP & the port
addr := &unix.SockaddrInet4{Port: port}
copy(addr.Addr[:], net.ParseIP(host).To4())
if err := unix.Bind(serverFD, addr); err != nil {
return fmt.Errorf("failed to bind socket: %v", err)
}
// Start listening
if err := unix.Listen(serverFD, maxClients); err != nil {
return fmt.Errorf("failed to listen on socket: %v", err)
}
// Create kqueue instance
kq, err := unix.Kqueue()
if err != nil {
return fmt.Errorf("failed to create kqueue: %v", err)
}
defer unix.Close(kq)
// Register the serverFD with kqueue
kev := unix.Kevent_t{
Ident: uint64(serverFD),
Filter: unix.EVFILT_READ,
Flags: unix.EV_ADD,
}
if _, err := unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil); err != nil {
return fmt.Errorf("failed to register server FD with kqueue: %v", err)
}
// Event loop
for {
nevents, err := unix.Kevent(kq, nil, events, nil)
if err != nil {
if err == unix.EINTR {
continue // Interrupted system call, retry
}
return fmt.Errorf("kevent error: %v", err)
}
for i := 0; i < nevents; i++ {
ev := events[i]
fd := int(ev.Ident)
if fd == serverFD {
// Accept the incoming connection from client
nfd, sa, err := unix.Accept(serverFD)
if err != nil {
log.Printf("failed to accept connection: %v", err)
continue
}
// Set the new socket to non-blocking mode
if err := unix.SetNonblock(nfd, true); err != nil {
log.Printf("failed to set non-blocking mode on client FD: %v", err)
unix.Close(nfd)
continue
}
// Register the new client FD with kqueue
clientKev := unix.Kevent_t{
Ident: uint64(nfd),
Filter: unix.EVFILT_READ,
Flags: unix.EV_ADD,
}
if _, err := unix.Kevent(kq, []unix.Kevent_t{clientKev}, nil, nil); err != nil {
log.Printf("failed to register client FD with kqueue: %v", err)
unix.Close(nfd)
continue
}
log.Printf("accepted new connection from %v", sa)
} else {
// Handle client I/O
buf := make([]byte, 1024)
n, err := unix.Read(fd, buf)
if err != nil {
if err == unix.EAGAIN || err == unix.EWOULDBLOCK {
continue // No data available right now
}
log.Printf("failed to read from client FD %d: %v", fd, err)
// Remove the FD from kqueue and close it
kev := unix.Kevent_t{
Ident: uint64(fd),
Filter: unix.EVFILT_READ,
Flags: unix.EV_DELETE,
}
unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil)
unix.Close(fd)
continue
}
if n == 0 {
// Connection closed by client
log.Printf("client FD %d closed the connection", fd)
kev := unix.Kevent_t{
Ident: uint64(fd),
Filter: unix.EVFILT_READ,
Flags: unix.EV_DELETE,
}
unix.Kevent(kq, []unix.Kevent_t{kev}, nil, nil)
unix.Close(fd)
continue
}
// Process the data received
data := buf[:n]
log.Printf("received data from client FD %d: %s", fd, string(data))
// Echo the data back to the client (optional)
if _, err := unix.Write(fd, data); err != nil {
log.Printf("failed to write to client FD %d: %v", fd, err)
// Handle write error if necessary
}
}
}
}
}
func main() {
if err := RunAsyncTCPServerUnix(); err != nil {
log.Fatalf("server error: %v", err)
}
}
How to test above code?
netcat
netcat is a computer networking utility for reading from and writing to network connections using TCP or UDP.
Open up two or more terminal windows.
Type
nc localhost 8080
💡You can refer to this article to learn more on netcat.Go Client Code
package main import ( "bufio" "fmt" "log" "net" "sync" "time" ) const ( serverAddress = "127.0.0.1:8080" // Server address numClients = 100 // Number of concurrent clients to simulate ) func main() { var wg sync.WaitGroup for i := 0; i < numClients; i++ { wg.Add(1) go func(clientID int) { defer wg.Done() err := runClient(clientID) if err != nil { log.Printf("Client %d error: %v", clientID, err) } }(i) // Optional: Sleep to stagger client connections time.Sleep(100 * time.Millisecond) } wg.Wait() } func runClient(clientID int) error { // Connect to the server conn, err := net.Dial("tcp", serverAddress) if err != nil { return fmt.Errorf("failed to connect: %v", err) } defer conn.Close() log.Printf("Client %d connected to %s", clientID, serverAddress) // Send a message to the server message := fmt.Sprintf("Hello from client %d", clientID) _, err = fmt.Fprintf(conn, message+"\n") if err != nil { return fmt.Errorf("failed to send data: %v", err) } time.Sleep(100 * time.Millisecond) // Receive a response from the server reply, err := bufio.NewReader(conn).ReadString('\n') if err != nil { return fmt.Errorf("failed to read response: %v", err) } log.Printf("Client %d received: %s", clientID, reply) return nil }
So that’s it for this one. Hope you liked this article! If you have questions/comments, then please feel free to comment on this article.
You can find the implementation for kqueue
, epoll
& the Go client in this GitHub repository. Stay tuned for next one!
Disclaimer: The opinions expressed here are my own and do not represent the views of my employer. This blog is intended for informational purposes only.
Subscribe to my newsletter
Read articles from Aniket Mahangare directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Aniket Mahangare
Aniket Mahangare
I am a Software Engineer in the Platform team at Uber India, deeply passionate about System Architecture, Database Internals, and Advanced Algorithms. I thrive on diving into intricate engineering details and bringing complex solutions to life. In my leisure time, I enjoy reading insightful articles, experimenting with new ideas, and sharing my knowledge through writing. This blog is a space where I document my learning journey, project experiences, and technical insights. Thank you for visiting—I hope you enjoy my posts!