Citus is a PostgreSQL extension developed with the intention of sharding and distributing the data across multiple machines. It offers features like distributed tables, reference tables, schema-based sharding and columnar storage.
We have already covered the basics of Citus and the initial setup part in some earlier blog posts:
How To Scale a Single-Host PostgreSQL Database With Citus
Scalable Solutions with Percona Distribution for PostgreSQL (Part 2): Using Citus
In this discussion, we will focus on integrating the Citus extension in the Patroni HA model and understanding its behavior and how it works.
For the demo purpose, we will use the topology below, which has one coordinator node [Coordinator] and two worker nodes [Worker1 and Worker2] along with their Replicas [Worker1 Replica and Worker2 Replica] running under Patroni.
1 2 3 4 5 6 7 8 9 10 |
Coordinator 172.31.89.51 pg0 Worker Group 1 Worker1 172.31.95.174 pg1 Worker1 Replica 172.31.87.156 pg3 Worker Group 2 Worker2 172.31.28.124 pg2 Worker2 Replica 172.31.84.95 pg4 |
Let’s walk through the steps required to achieve the full setup.
1) First, we need to install the Citus extension on all Patroni nodes. I was using pgv16, so I chose the package accordingly. For all other distributions, the installation steps are found here: https://docs.citusdata.com/en/stable/installation/multi_node.html#multi-node-citus.
1 |
shell> sudo yum install -y citus130_16 |
2) Next, we need to define the Citus configuration inside the [patorni.yaml] file for each node.
1 2 3 |
citus: group: X database: citus |
Group: We have to define each separate group for coordinator and worker nodes.
Database: Define the Citus database containing distributed and other meta tables. The Citus extension is also automatically created in the defined database
Coordinator:
1 2 3 |
citus: group: 0 database: citus |
Worker1:
1 2 3 |
citus: group: 1 database: citus |
Worker1 Replica:
1 2 3 |
citus: group: 1 database: citus |
Worker2:
1 2 3 |
citus: group: 2 database: citus |
Worker2 Replica:
1 2 3 |
citus: group: 2 database: citus |
Once we restart the Patroni service on all nodes, it will automatically create the Citus extension and take care of other configurations and creations.
1 |
shell> systemctl restart patroni |
To add any new database, we have to create it alongside the Citus extension on all nodes.
- Execute on all coordinator and worker nodes.
1 2 3 |
psql=# create database citus_new psql=# c citus_new citus_new=# create extension citus; |
- Register nodes in Coordinator.
1 2 3 |
citus_new=# SELECT citus_set_coordinator_host('HOST',PORT) ; citus_new=# SELECT citus_add_node('HOST',PORT); citus_new=# SELECT citus_add_node('HOST',PORT); |
- If we have any secondary nodes, then we need to register them as well for each worker node.
1 2 |
citus_new=# select * from master_add_secondary_node('NEW NODE',PORT, 'PRIMARY NODE', PORT); citus_new=# select * from master_add_secondary_node(NEW NODE',PORT, 'PRIMARY NODE', PORT); |
- Finally, create and distribute the table.
1 2 |
citus_new=# create table t51 (id integer primary key, t_name text); citus_new=# SELECT create_distributed_table('t51', 'id'); |
Alternatively, for each Leader Role, we can perform dynamic changes and Reload/Restart services accordingly.
E.g.,
1 |
shell> /usr/local/bin/patronictl -c /etc/patroni.yml edit-config citus |
1 |
shell> /usr/local/bin/patronictl --config-file /etc/patroni.yml reload citus |
Integrating Coordinator/Worker nodes will be taken care of automatically with the additional changes below by Patroni.
- Citus extension will automatically added in shared_preload_libraries
- Patroni will set [bootstrap.dcs.synchronous_mode to quorum].
- Database “citus” will also automatically be created.
- The coordinator primary node will automatically discover worker primary nodes and add them to the pg_dist_node table using the citus_add_node() function.
- Patroni will also handle the status in pg_dist_node in case failover/switchover on the coordinator or worker triggers.
The worker nodes will reflect the “citus” database and extension installation.
1 2 3 4 5 |
citus=# l List of databases Name | Owner | Encoding | Locale Provider | Collate | Ctype | ICU Locale | ICU Rules | Access privileges -----------+----------+----------+-----------------+-------------+-------------+------------+-----------+----------------------- citus | postgres | UTF8 | libc | en_US.UTF-8 | en_US.UTF-8 | | | |
1 2 3 4 5 6 |
citus=# dx List of installed extensions Name | Version | Schema | Description ----------------+---------+------------+------------------------------ citus | 13.0-1 | pg_catalog | citus distributed database citus_columnar | 11.3-1 | pg_catalog | citus Columnar extension |
Below is the final status we can see in the “patronictl list” output.
1 2 3 4 5 6 7 8 9 10 |
[root@ip-172-31-89-51 ~]# /usr/local/bin/Patronictl -c /etc/Patroni.yml list citus + citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 14 | | | 1 | Worker1 | 172.31.95.174 | Leader | running | 17 | | | 1 | Worker1 Replica | 172.31.87.156 | Quorum Standby | streaming | 17 | 0 | | 2 | Worker2 | 172.31.28.124 | Leader | running | 15 | | | 2 | Worker2 Replica | 172.31.84.95 | Quorum Standby | streaming | 15 | 0 | +-------+-----------------+---------------+----------------+-----------+----+-----------+ |
The user required for Patroni management and replication should work in the same manner as it does in native Patroni deployments. So we need to make sure the super user “postgres” and the replication user “replicator” are accessible from other Patroni nodes.
E.g.,
Patroni template from [coordinator] node:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
scope: citus name: Coordinator restapi: listen: 0.0.0.0:8008 connect_address: 172.31.89.51:8008 etcd3: hosts: - 172.31.89.51:2379 - 172.31.95.174:2379 - 172.31.28.124:2379 postgresql: listen: 0.0.0.0:5432 connect_address: 172.31.89.51:5432 data_dir: /var/lib/pgsql/16/data bin_dir: /usr/pgsql-16/bin pgpass: /tmp/pgpass0 authentication: superuser: username: postgres password: postgres_pass replication: username: replicator password: repl_pass parameters: wal_level: replica hot_standby: "on" max_wal_senders: 10 max_replication_slots: 10 wal_keep_size: 64 shared_preload_libraries: citus password_encryption: scram-sha-256 ssl: 'on' ssl_cert_file: '/var/lib/pgsql/16/data/server.crt' ssl_key_file: '/var/lib/pgsql/16/data/server.key' pg_hba: - hostssl all postgres 127.0.0.1/32 scram-sha-256 - hostssl all postgres ::1/128 scram-sha-256 - local all postgres scram-sha-256 - hostssl all postgres 172.31.89.51/32 scram-sha-256 - hostssl all postgres 172.31.87.156/32 scram-sha-256 - hostssl all postgres 172.31.95.174/32 scram-sha-256 - hostssl all postgres 172.31.28.124/32 scram-sha-256 - hostssl all postgres 172.31.84.95/32 scram-sha-256 - hostssl replication replicator 127.0.0.1/32 scram-sha-256 - hostssl replication replicator ::1/128 scram-sha-256 - local replication replicator scram-sha-256 - hostssl replication replicator 172.31.89.51/32 scram-sha-256 - hostssl replication replicator 172.31.87.156/32 scram-sha-256 - hostssl replication replicator 172.31.95.174/32 scram-sha-256 - hostssl replication replicator 172.31.28.124/32 scram-sha-256 - hostssl replication replicator 172.31.84.95/32 scram-sha-256 tags: nofailover: false noloadbalance: false citus: group: 0 database: citus |
Patroni template from one of the [worker] nodes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
scope: citus name: Worker1 restapi: listen: 0.0.0.0:8008 connect_address: 172.31.95.174:8008 etcd3: hosts: - 172.31.89.51:2379 - 172.31.95.174:2379 - 172.31.28.124:2379 postgresql: listen: 0.0.0.0:5432 connect_address: 172.31.95.174:5432 data_dir: /var/lib/pgsql/16/data bin_dir: /usr/pgsql-16/bin pgpass: /tmp/pgpass0 authentication: superuser: username: postgres password: postgres_pass replication: username: replicator password: repl_pass parameters: wal_level: replica hot_standby: "on" max_wal_senders: 10 max_replication_slots: 10 wal_keep_size: 64 shared_preload_libraries: citus ssl: 'on' ssl_cert_file: '/var/lib/pgsql/16/data/server.crt' ssl_key_file: '/var/lib/pgsql/16/data/server.key' pg_hba: - hostssl all postgres 127.0.0.1/32 scram-sha-256 - hostssl all postgres ::1/128 scram-sha-256 - local all postgres scram-sha-256 - hostssl all postgres 172.31.89.51/32 scram-sha-256 - hostssl all postgres 172.31.87.156/32 scram-sha-256 - hostssl all postgres 172.31.95.174/32 scram-sha-256 - hostssl all postgres 172.31.28.124/32 scram-sha-256 - hostssl all postgres 172.31.84.95/32 scram-sha-256 - hostssl replication replicator 127.0.0.1/32 scram-sha-256 - hostssl replication replicator ::1/128 scram-sha-256 - local replication replicator scram-sha-256 - hostssl replication replicator 172.31.89.51/32 scram-sha-256 - hostssl replication replicator 172.31.87.156/32 scram-sha-256 - hostssl replication replicator 172.31.95.174/32 scram-sha-256 - hostssl replication replicator 172.31.28.124/32 scram-sha-256 - hostssl replication replicator 172.31.84.95/32 scram-sha-256 tags: nofailover: false noloadbalance: false citus: group: 1 database: citus |
3) Verifying the node details from the “Coordinator” and “Worker” nodes.
Coordinator:
1 2 3 4 5 6 7 8 9 |
citus=# SELECT * FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------+---------+----------+----------+----------+-------------+----------+-----------+-------------+----------------+------------------ 1 | 0 | pg0 | 5432 | default | t | t | primary | default | t | f 21 | 2 | pg4 | 5432 | default | f | t | secondary | default | f | t 3 | 2 | pg2 | 5432 | default | t | t | primary | default | t | t 2 | 1 | pg1 | 5432 | default | t | t | primary | default | t | t 18 | 1 | pg3 | 5432 | default | f | t | secondary | citus | f | t (5 rows) |
Worker:
1 2 3 4 5 6 7 8 9 |
citus=# SELECT * FROM pg_catalog.pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------+---------+----------+----------+----------+-------------+----------+-----------+-------------+----------------+------------------ 1 | 0 | pg0 | 5432 | default | t | t | primary | default | t | f 2 | 1 | pg1 | 5432 | default | t | t | primary | default | t | t 3 | 2 | pg2 | 5432 | default | t | t | primary | default | t | t 18 | 1 | pg3 | 5432 | default | f | t | secondary | citus | f | t 21 | 2 | pg4 | 5432 | default | f | t | secondary | default | f | t (5 rows) |
For a specific coordinator and worker group we can list the node details using the “–group” option.
1 2 3 4 5 6 7 |
[root@ip-172-31-89-51 ~]# /usr/local/bin/Patronictl -c /etc/Patroni.yml list citus --group 1 + citus cluster: citus (group: 1, 7522511395167405146) ---+----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +----------------+--------------------+---------+---------+----+-----------+ | Worker1 | 172.31.95.174:5432 | Leader | running | 19 | | | Worker1Replica | 172.31.95.174:5432 | Replica | running | | 0 | +----------------+--------------------+---------+---------+----+-----------+ |
Data distribution
Let’s now distribute some tables and see how they are kept inside the distributed nodes and their Replicas.
- Creating a new table
1 2 |
citus=# create table dist_test (id integer primary key, t_name text); CREATE TABLE |
- Distributing the table based on the “ID” column
1 2 3 |
citus=# SELECT create_distributed_table('dist_test', 'id'); create_distributed_table -------------------------- |
- Inserting some DML to distribute them across worker nodes
1 2 |
citus=# INSERT INTO dist_test (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 50000) AS i; INSERT 0 50000 |
We can see below that the distribution works fine and the shards are evenly distributed among other worker nodes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'dist_test'::regclass; WARNING: connection to the remote node postgres@pg0:5432 failed with the following error: fe_sendauth: no password supplied table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------------+------------------+---------------+----------+----------+------------ dist_test | 102296 | dist_test_102296 | distributed | 2 | pg3 | 5432 | 172032 dist_test | 102296 | dist_test_102296 | distributed | 2 | pg1 | 5432 | 172032 dist_test | 102296 | dist_test_102296 | distributed | 2 | pg2 | 5432 | 172032 dist_test | 102296 | dist_test_102296 | distributed | 2 | pg4 | 5432 | 172032 dist_test | 102297 | dist_test_102297 | distributed | 2 | pg2 | 5432 | 172032 dist_test | 102297 | dist_test_102297 | distributed | 2 | pg4 | 5432 | 172032 dist_test | 102297 | dist_test_102297 | distributed | 2 | pg3 | 5432 | 172032 dist_test | 102297 | dist_test_102297 | distributed | 2 | pg1 | 5432 | 172032 dist_test | 102298 | dist_test_102298 | distributed | 2 | pg3 | 5432 | 172032 ... dist_test | 102326 | dist_test_102326 | distributed | 2 | pg2 | 5432 | 172032 dist_test | 102326 | dist_test_102326 | distributed | 2 | pg4 | 5432 | 172032 dist_test | 102327 | dist_test_102327 | distributed | 2 | pg2 | 5432 | 172032 dist_test | 102327 | dist_test_102327 | distributed | 2 | pg4 | 5432 | 172032 dist_test | 102327 | dist_test_102327 | distributed | 2 | pg3 | 5432 | 172032 dist_test | 102327 | dist_test_102327 | distributed | 2 | pg1 | 5432 | 172032 |
1 2 3 4 5 6 7 8 |
citus=# SELECT nodename, count(*) FROM citus_shards WHERE table_name = 'dist_test'::regclass group by nodename; nodename | count ----------+------- pg2 | 32 pg1 | 32 pg4 | 32 pg3 | 32 (4 rows) |
Failover
The failover/switchover process will work similarly to how it works for normal Patroni nodes. The only difference is that under Citus, we can invoke the failover/switchover with respect to any single group of worker nodes.
In the example below, I am switching the leader of Worker Group “1” from “Worker1” to “Worker1 Replica”.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
[root@ip-172-31-89-51 ~]# /usr/local/bin/Patronictl -c /etc/Patroni.yml switchover citus Current cluster topology + citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 14 | | | 1 | Worker1 | 172.31.95.174 | Leader | running | 17 | | | 1 | Worker1 Replica | 172.31.87.156 | Quorum Standby | streaming | 17 | 0 | | 2 | Worker2 | 172.31.28.124 | Leader | running | 15 | | | 2 | Worker2 Replica | 172.31.84.95 | Quorum Standby | streaming | 15 | 0 | +-------+-----------------+---------------+----------------+-----------+----+-----------+ citus group: 1 Primary [Worker1]: Candidate ['Worker1 Replica'] []: When should the switchover take place (e.g. 2025-07-19T08:19 ) [now]: Are you sure you want to switchover cluster citus, demoting current leader Worker1? [y/N]: y 2025-07-19 07:19:22.93632 Successfully switched over to "Worker1 Replica" + citus cluster: citus (group: 1, 7522511395167405146) ----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +-----------------+---------------+---------+---------+----+-----------+ | Worker1 | 172.31.95.174 | Replica | stopped | | unknown | | Worker1 Replica | 172.31.87.156 | Leader | running | 17 | | +-----------------+---------------+---------+---------+----+-----------+ |
Post switchover, the status will change for Worker Group “1” nodes as below.
1 2 3 4 5 6 7 8 9 10 11 |
[root@ip-172-31-89-51 ~]# /usr/local/bin/Patronictl -c /etc/Patroni.yml switchover citus Current cluster topology + citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 14 | | | 1 | Worker1 | 172.31.95.174 | Quorum Standby | streaming | 18 | 0 | | 1 | Worker1 Replica | 172.31.87.156 | Leader | running | 18 | | | 2 | Worker2 | 172.31.28.124 | Leader | running | 15 | | | 2 | Worker2 Replica | 172.31.84.95 | Quorum Standby | streaming | 15 | 0 | +-------+-----------------+---------------+----------------+-----------+----+-----------+ |
Other maintenance operations, such as restarting Patroni, will work the same way they do with standard Patroni nodes.
E.g.,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
root@ip-172-31-89-51 ~]# /usr/local/bin/patronictl -c /etc/patroni.yml restart citus Worker1 + citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 9 | | | 1 | Worker1 | 172.31.95.174 | Leader | running | 10 | | | 1 | Worker1Replica | 172.31.87.156 | Quorum Standby | streaming | 10 | 0 | | 2 | Worker2 | 172.31.28.124 | Leader | running | 8 | | | 2 | Worker2 Replica | 172.31.84.95 | Quorum Standby | streaming | 8 | 0 | +-------+-----------------+---------------+----------------+-----------+----+-----------+ When should the restart take place (e.g. 2025-07-21T06:53) [now]: Are you sure you want to restart members Worker1? [y/N]: y Restart if the PostgreSQL version is less than provided (e.g. 9.5.2) []: Success: restart on member Worker1 |
Final thought
This seamless integration of Citus and Patroni has unlocked the power of both write scalability and high availability features. Citus solves the problem of managing large datasets and individual big tables on a single machine by providing different sharding/distribution methods, which basically distribute or scale data over different worker nodes. The ability to fail over individual worker nodes and read from secondary workers is just a cherry on top.
Lastly, we don’t encourage using Citus in production unless the user fully assesses the pros/cons. Thorough testing in non-production is very crucial before implementing Citus in production, as the add-on extension could have its own challenges and can affect the internal Postgres metadata and behavior.
Disclaimer: This blog is purely based on my evaluation of the Citus extension and does not reflect Percona’s recommendations/suggestions. I would suggest evaluating possible support coverage before planning for any production usage.