Scaling PostgreSQL with Kubernetes

Table of contents
- A case for vertical scaling
- Prerequisite
- Replication
- Partitioning
- Sharding with replication
- Types of sharding
- Setup StackGres and enable load balancer
- Define CRD for Sharded Cluster
- Apply Citus CRD
- Get credentials
- Create some distributed table
- Insert some data
- See how shards are spread
- Find which node host which shard
- Find which has a specific row
- Join distributed-distributed (co-located)
- Join distributed-reference
- References

A case for vertical scaling
If you have read any article or a book on system design then you probably know what vertical and horizontal scaling is and benefits of horizontal scaling. Before I explain how to setup proper horizontal scaling with Postgres let me make a case when you should not try this.
Simplicity: Single node database means you can run your database out of the box. Although I recommend you run PGTune for a quick preset or visit postconf a full breakdown
Easier backup and recovery: No need to think about state across replicas when creating backups or applying a backup.
No network overhead especially with write heavy operations.
A temporary fix: If need a fix right now, this will provide an instant relief.
Prerequisite
Make sure you have following tools installed.
Following the guide requires you have basic understanding of Kubernetes, CRD, Helm. Nothing deep a quick AI summary will suffice.
Replication
Replication means keeping multiple copies of data on multiple machines connected via network. Here is why you might want to do that:
It keeps you data close to your users.
It acts as a hot backup of a follower goes down.
It helps with scaling if most of your workload is read operation (which is the case for most OLTP)
Here pg-pool acts as load balancer, it distributes read request evenly among followers and mutation request to the leader. Notice that Leader periodically syncs it WAL with it’s followers.
Setup StackGres and enable load balancer
minikube addons enable metallb
minikube tunnel
helm install stackgres-operator stackgres-charts/stackgres-operator \
--namespace stackgres-operator \
--create-namespace
Define CRD for replicated cluster
apiVersion: stackgres.io/v1
kind: SGCluster
metadata:
name: cluster
spec:
instances: 3 # 1 primary + 2 replicas
postgres:
version: "15"
pods:
persistentVolume:
size: "1Gi"
profile: development
postgresServices:
primary:
type: LoadBalancer
replicas:
type: LoadBalancer
Apply the CRD
kubectl apply -f ./replication.yaml
kubectl get pods -w
Get credentials
PG_PASSWORD=$(kubectl -n default get secret cluster --template '{{ printf "%s" (index .data "superuser-password" | base64decode) }}')
echo "The superuser password is: $PG_PASSWORD"
See who is who
kubectl exec -it cluster-0 -c patroni -- patronictl list
Kill the primary
kubectl delete pod cluster-0
See who is in charge now
Patroni should have elected a new leader by now.
kubectl exec -it cluster-1 -c patroni -- patronictl list
Tell something only to the primary
PRIMARY=$(kubectl exec -it cluster-1 -c patroni -- patronictl list | grep Leader | awk '{print $2}')
kubectl exec -it $PRIMARY -c patroni -- psql -U postgres -c "CREATE TABLE replication_test_table (id SERIAL PRIMARY KEY, data TEXT);"
kubectl exec -it $PRIMARY -c patroni -- psql -U postgres -c "INSERT INTO replication_test_table (data) VALUES ('Spread the word about our lord savior PostgreSQL!');"
Primary tell his followers
kubectl exec -it cluster-0 -c patroni -- psql -U postgres -c "SELECT * FROM replication_test_table;"
kubectl exec -it cluster-1 -c patroni -- psql -U postgres -c "SELECT * FROM replication_test_table;"
kubectl exec -it cluster-2 -c patroni -- psql -U postgres -c "SELECT * FROM replication_test_table;"
As you can see how quickly the word has spread. This is possible because StackGres uses Patroni under the hood to coordinate all the replication.
Partitioning
Partitioning splits the data (table in our case) into smaller, more manageable parts. This is done within a single database instance. Postgres supports this out of the box. It is defined in data definition layer and having multiple replicas for makes a partition highly available. It works best for time-series data, logs, or region-based segmentation.
Types of Partitioning
Range Partitioning – Data is partitioned based on value ranges (e.g., date ranges).
List Partitioning – Partitioning based on a list of values (e.g., regions or categories).
Hash Partitioning – Data is distributed using a hash function (e.g.,
MOD(user_id, 4)
).
Following code create a table orders and derives three tables from it using range, list and hash based partition in a hierarchical way. Order table is split by year, year is further split into regions and region is finally split by hash.
Notice that only hash based partition grantees that all partition are of same size.
Setup StackGres and enable load balancer
helm install stackgres-operator stackgres-charts/stackgres-operator \
--namespace stackgres-operator \
--create-namespace
minikube addons enable metallb
minikube tunnel
Get credentials
PG_PASSWORD=$(kubectl -n default get secret cluster --template '{{ printf "%s" (index .data "superuser-password" | base64decode) }}')
echo "The superuser password is: $PG_PASSWORD"
Your database should now be available at
postgresql://postgres:<password>:localhost:5432
Now open an SQL Editor like pgAdmin, and run the following.
-- Parent table
CREATE TABLE orders (
order_id INT,
customer_id INT,
order_date DATE,
region TEXT,
amount INT,
PRIMARY KEY (order_id, order_date, region, customer_id)
) PARTITION BY RANGE (order_date);
-- Range: Year 2024
CREATE TABLE orders_2024 PARTITION OF orders
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01')
PARTITION BY LIST (region);
-- Range: Year 2025
CREATE TABLE orders_2025 PARTITION OF orders
FOR VALUES FROM ('2025-01-01') TO ('2026-01-01')
PARTITION BY LIST (region);
-- 2024 - US region
CREATE TABLE orders_2024_us PARTITION OF orders_2024
FOR VALUES IN ('US')
PARTITION BY HASH (customer_id);
-- 2024 - EU region
CREATE TABLE orders_2024_eu PARTITION OF orders_2024
FOR VALUES IN ('EU')
PARTITION BY HASH (customer_id);
-- 2024 - US - Hash partitions
CREATE TABLE orders_2024_us_0 PARTITION OF orders_2024_us FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE TABLE orders_2024_us_1 PARTITION OF orders_2024_us FOR VALUES WITH (MODULUS 2, REMAINDER 1);
-- 2024 - EU - Hash partitions
CREATE TABLE orders_2024_eu_0 PARTITION OF orders_2024_eu FOR VALUES WITH (MODULUS 2, REMAINDER 0);
CREATE TABLE orders_2024_eu_1 PARTITION OF orders_2024_eu FOR VALUES WITH (MODULUS 2, REMAINDER 1);
Bulk insert synthetic data
-- Generate 1000 random orders
INSERT INTO orders (order_id, customer_id, order_date, region, amount)
SELECT
-- Generate order IDs between 1000 and 9999
1000 + floor(random() * 9000)::int AS order_id,
-- Generate customer IDs between 1000 and 9999
1000 + floor(random() * 9000)::int AS customer_id,
-- Generate dates in 2024 (to fit the 2024 partition)
DATE '2024-01-01' + (floor(random() * 366)::int * INTERVAL '1 day') AS order_date,
-- Randomly select region
(ARRAY['US', 'EU'])[1 + floor(random() * 2)::int] AS region,
-- Generate random amounts between 10 and 1000
10 + floor(random() * 990)::int AS amount
FROM
generate_series(1, 1000) AS i; -- 1k rows
SELECT * FROM orders
WHERE order_date = '2024-06-10'
AND region = 'US'
Querying the orders does not require you to know the partition
Sharding with replication
Sharding splits a large database into small pieces called shards. Each shard is then split among multiple machines so that our database can continue to function even if we lose a few machines. Routing of queries to the proper is done by a coordinator, and just like with the replication example we will have pg-pool
doing load balancing within a shard.
Types of sharding
Row based: Think of it like splitting a very thick book into many volumes (shards) based on and creating a new volumes just to keep track of table of content (coordinator). Think of a table where the schema of the table is simple but amount of rows and amount write operation has gone crazy. With this method both read/write operation for every shard can scale as needed.
Schema based: Just like last time we are still splitting the book but this time we are taking a few chapters that are related and turning it into a book about a sub topic. Think of how a very thick physics textbook can be split into Optics, Thermodynamics, Quantum Mechanics. Think of a table to large number of columns, but you don’t need the all the columns every time a query is made. So you split the table into shards such that related columns get placed together.
Notice the resiliency of this architecture, not only we have multiple replicas for shards but also for the coordinator. As long as we have a minimum of 3 machines to run our sharded cluster, failure of single machine will not bring the down our database.
Setup StackGres and enable load balancer
helm install stackgres-operator stackgres-charts/stackgres-operator \
--namespace stackgres-operator \
--create-namespace
minikube addons enable metallb
minikube tunnel
Define CRD for Sharded Cluster
# shard.yaml
apiVersion: stackgres.io/v1alpha1
kind: SGShardedCluster
metadata:
name: cluster
spec:
type: citus
database: mydatabase
postgres:
version: 'latest'
coordinator:
instances: 2 # Number of coordinator instances
pods:
persistentVolume:
size: '1Gi'
shards:
clusters: 3 # Number of shards
instancesPerCluster: 3 # 1 primary and 2 replicas
pods:
persistentVolume:
size: '1Gi'
postgresServices:
coordinator:
primary:
type: LoadBalancer
profile: development
Apply Citus CRD
kubectl apply -f ./shard.yaml
Get credentials
PG_PASSWORD=$(kubectl -n default get secret cluster --template '{{ printf "%s" (index .data "superuser-password" | base64decode) }}')
echo "The superuser password is: $PG_PASSWORD"
Your database should now be available at
postgresql://postgres:<password>:localhost:5432
Now open a app like SQL Editor like pgAdmin, and run the following.
Create some distributed table
CREATE TABLE users (
id BIGINT PRIMARY KEY,
name TEXT
);
SELECT create_distributed_table('users', 'id');
CREATE TABLE orders (
id BIGINT,
user_id BIGINT,
product_id BIGINT,
amount INTEGER,
PRIMARY KEY (user_id, id)
);
SELECT create_distributed_table('orders', 'user_id');
CREATE TABLE products (
id BIGINT PRIMARY KEY,
name TEXT,
price NUMERIC
);
SELECT create_reference_table('products');
Insert some data
INSERT INTO users (id, name) VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie');
INSERT INTO orders (id, user_id, product_id, amount) VALUES
(1, 1, 1, 2),
(2, 1, 2, 3),
(3, 2, 1, 1),
(4, 3, 3, 5);
INSERT INTO products (id, name, price) VALUES
(1, 'Product A', 10.00),
(2, 'Product B', 20.00),
(3, 'Product C', 30.00);
See how shards are spread
SELECT * FROM citus_shards
WHERE table_name = 'orders'::regclass;
Find which node host which shard
SELECT
s.shardid,
n.nodename,
n.nodeport
FROM pg_dist_shard s
JOIN pg_dist_shard_placement p ON s.shardid = p.shardid
JOIN pg_dist_node n ON p.nodename = n.nodename
WHERE s.logicalrelid = 'orders'::regclass;
Find which has a specific row
SELECT get_shard_id_for_distribution_column('orders', 1);
Join distributed-distributed (co-located)
SELECT
o.id AS order_id,
u.name AS customer,
o.amount
FROM orders o
JOIN users u ON o.user_id = u.id;
This is efficient because orders and users are sharded using the same key (user_id and id)
Join distributed-reference
SELECT
o.id AS order_id,
u.name AS customer,
p.name AS product,
o.amount
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN products p ON o.product_id = p.id;
This works well because products is replicated across all nodes.
References
Subscribe to my newsletter
Read articles from Sagyam Thapa directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Sagyam Thapa
Sagyam Thapa
Passionate about all areas of computer science, currently exploring DevOps, SRE and Linux