In my previous blog post, Integrating Citus with Patroni: Sharding and High Availability Together, I explored how to integrate Citus with Patroni and demonstrated how basic table distribution works. In this follow-up post, I will discuss various other Citus distribution models. We will also explore how shard rebalancing and data movement are handled and further leverage Read scaling via targeting secondary worker nodes.
Let’s explore different distribution categories one by one. I added the nodes/cluster details below for ease of understanding.
1 2 3 4 5 6 7 8 9 10 |
/usr/local/bin/patronictl -c /etc/patroni.yml list citus + Citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 10 | | | 1 | Worker1 | 172.31.95.174 | Leader | running | 12 | | | 1 | Worker1Replica | 172.31.87.156 | Quorum Standby | streaming | 12 | 0 | | 2 | Worker2 | 172.31.28.124 | Quorum Standby | streaming | 9 | 0 | | 2 | Worker2 Replica | 172.31.84.95 | Leader | running | 9 | | +-------+-----------------+---------------+----------------+-----------+----+-----------+ |
1 2 3 4 5 6 7 8 9 10 |
citus=# SELECT * FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shoul dhaveshards --------+---------+---------------+----------+----------+-------------+----------+-----------+-------------+----------------+------ ------------ 1 | 0 | 172.31.89.51 | 5432 | default | t | t | primary | default | t | f 178 | 2 | 172.31.28.124 | 5432 | default | f | t | secondary | default | f | t 1056 | 1 | 172.31.87.156 | 5432 | default | f | t | secondary | default | f | t 152 | 1 | 172.31.95.174 | 5432 | default | t | t | primary | default | t | t 168 | 2 | 172.31.84.95 | 5432 | default | t | t | primary | default | t | t |
1) Schema-based sharding
As the name suggests, the different “schema” inside the database will be sharded or segregated over different dedicated worker nodes. We don’t need to distribute any tables, as they automatically reside in their “schema” dedicated shard node.
This can be useful when a multi-tenant application or microservices has a dedicated schema for each service. The application needs to switch [search_path] to use different schemas while working.
- Below, we create two different schemas (s1 and s2) in the coordinator node and then distribute them over the worker nodes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
citus=# create schema s1; CREATE SCHEMA citus=# create table s1.s1_tbl (id integer primary key, t_name text); CREATE TABLE citus=# INSERT INTO s1.s1_tbl (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 citus=# create table s1.s1_tbl2 (id integer primary key, t_name text); CREATE TABLE citus=# INSERT INTO s1.s1_tbl2 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
1 2 3 4 5 6 7 8 9 10 11 12 |
citus=# SELECT citus_schema_distribute('s1'); NOTICE: distributing the schema s1 NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$s1.s1_tbl$$) NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$s1.s1_tbl2$$) citus_schema_distribute ------------------------- |
1 2 3 4 5 6 7 8 |
citus=# create schema s2; CREATE SCHEMA citus=# create table s2.s2_tbl (id integer primary key, t_name text); CREATE TABLE citus=# INSERT INTO s2.s2_tbl (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
1 2 3 4 5 6 7 8 9 |
citus=# SELECT citus_schema_distribute('s2'); NOTICE: distributing the schema s2 NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$s2.s2_tbl$$) citus_schema_distribute ------------------------- (1 row) |
- We can quickly check some basic information related to an individual schema.
1 2 3 4 5 6 |
citus=# select * from citus_schemas; schema_name | colocation_id | schema_size | schema_owner -------------+---------------+-------------+-------------- s1 | 23 | 1440 kB | postgres s2 | 24 | 720 kB | postgres (2 rows) |
- Here, we can find some more detailed information about each individual table placement. Schema “s1” tables are distributed over the worker node (172.31.95.174), and “s2” tables are distributed over a different worker node (172.31.84.95). Any further new tables will be placed in their designated worker node only.
1 2 3 4 5 6 7 8 9 10 11 |
citus=# select table_name, shardid, colocation_id, nodename, nodeport, shard_size from citus_shards where citus_table_type = 'schema'; table_name | shardid | colocation_id | nodename | nodeport | shard_size ------------+---------+---------------+---------------+----------+------------ s1.s1_tbl | 104290 | 23 | 172.31.87.156 | 5432 | 737280 s1.s1_tbl | 104290 | 23 | 172.31.95.174 | 5432 | 737280 s1.s1_tbl2 | 104291 | 23 | 172.31.87.156 | 5432 | 737280 s1.s1_tbl2 | 104291 | 23 | 172.31.95.174 | 5432 | 737280 s2.s2_tbl | 104292 | 24 | 172.31.28.124 | 5432 | 737280 s2.s2_tbl | 104292 | 24 | 172.31.84.95 | 5432 | 737280 (6 rows) |
Query routing:
- When we query in schema “s1”, the data is fetched from the Leader worker node (172.31.95.174).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
citus=# SET search_path TO s1;; SET citus=# SET citus.explain_all_tasks TO on; SET citus=# explain (verbose on) select count(*) from s1_tbl; QUERY PLAN ----------------------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM s1.s1_tbl_104290 s1_tbl Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=180.00..180.01 rows=1 width=8) Output: count(*) -> Seq Scan on s1.s1_tbl_104290 s1_tbl (cost=0.00..155.00 rows=10000 width=0) Output: id, t_name (11 rows) |
- When we query in schema “s2”, the data is fetched from the Leader worker node (172.31.84.95).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
citus=# SET search_path TO s2; SET citus=# explain (verbose on) select count(*) from s2_tbl; QUERY PLAN ----------------------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM s2.s2_tbl_104292 s2_tbl Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=180.00..180.01 rows=1 width=8) Output: count(*) -> Seq Scan on s2.s2_tbl_104292 s2_tbl (cost=0.00..155.00 rows=10000 width=0) Output: id, t_name (11 rows) |
2) Reference tables
When we have some table dependency for the purpose of foreign key relation or read retrievals, we can, with the help of the function “create_reference_table(),” place the reference table on each worker node so that the JOIN query can be run locally. This might improve the retrieval efficiency instead of relying on row retrievals from remote worker nodes. This process basically replicates the underlying parent table on each worker node.
- Here, we have created two tables (r1 and r2) with a FK relationship.
1 2 3 4 5 6 |
citus=# CREATE TABLE r1 ( id bigint, t_name text, PRIMARY KEY (id) ); CREATE TABLE |
1 2 3 4 |
citus=# INSERT INTO r1 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
1 2 3 4 5 6 7 8 9 |
citus=# CREATE TABLE r2 ( id bigint, t_name text, PRIMARY KEY(ID), FOREIGN KEY (id) REFERENCES r1 (id) ); CREATE TABLE |
1 2 3 4 |
citus=# INSERT INTO r2 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
- Then, we are replicating this reference table “r1” across all worker nodes.
1 2 3 4 5 6 7 8 9 10 11 |
citus=# SELECT create_reference_table('r1'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.r1$$) NOTICE: local tables that are added to metadata automatically by citus, but not chained with reference tables via foreign keys might be automatically converted back to postgres tables HINT: Executing citus_add_local_table_to_metadata($$public.r2$$) prevents this for the given relation, and all of the connected relations create_reference_table ------------------------ (1 row) |
- Finally, we distribute the data of the child table “r2” on each Leader worker node.
1 2 3 4 5 6 7 8 9 |
citus=# SELECT create_distributed_table('r2', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.r2$$) create_distributed_table -------------------------- (1 row) |
- Now if we monitor the details we can clearly see the Parent table “r1” is basically replicated to each worker node with the same “shardid” and “colocation_id”.
1 2 3 4 5 6 7 8 9 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r1'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r1 | 104329 | r1_104329 | reference | 27 | 172.31.28.124 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.84.95 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.89.51 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.87.156 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.95.174 | 5432 | 811008 (5 rows) |
- However, the Child table “r2” is evenly distributed among all Leader worker nodes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r2'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r2 | 104331 | r2_104331 | distributed | 28 | 172.31.84.95 | 5432 | 49152 r2 | 104331 | r2_104331 | distributed | 28 | 172.31.28.124 | 5432 | 49152 r2 | 104332 | r2_104332 | distributed | 28 | 172.31.95.174 | 5432 | 40960 r2 | 104332 | r2_104332 | distributed | 28 | 172.31.87.156 | 5432 | 40960 r2 | 104333 | r2_104333 | distributed | 28 | 172.31.84.95 | 5432 | 40960 r2 | 104333 | r2_104333 | distributed | 28 | 172.31.28.124 | 5432 | 40960 r2 | 104334 | r2_104334 | distributed | 28 | 172.31.95.174 | 5432 | 40960 r2 | 104334 | r2_104334 | distributed | 28 | 172.31.87.156 | 5432 | 40960 r2 | 104335 | r2_104335 | distributed | 28 | 172.31.84.95 | 5432 | 40960 r2 | 104335 | r2_104335 | distributed | 28 | 172.31.28.124 | 5432 | 40960 r2 | 104336 | r2_104336 | distributed | 28 | 172.31.87.156 | 5432 | 40960 r2 | 104336 | r2_104336 | distributed | 28 | 172.31.95.174 | 5432 | 40960 r2 | 104337 | r2_104337 | distributed | 28 | 172.31.28.124 | 5432 | 40960 r2 | 104337 | r2_104337 | distributed | 28 | 172.31.84.95 | 5432 | 40960 r2 | 104338 | r2_104338 | distributed | 28 | 172.31.87.156 | 5432 | 49152 r2 | 104338 | r2_104338 | distributed | 28 | 172.31.95.174 | 5432 | 49152 r2 | 104339 | r2_104339 | distributed | 28 | 172.31.84.95 | 5432 | 49152 r2 | 104339 | r2_104339 | distributed | 28 | 172.31.28.124 | 5432 | 49152 r2 | 104340 | r2_104340 | distributed | 28 | 172.31.87.156 | 5432 | 49152 r2 | 104340 | r2_104340 | distributed | 28 | 172.31.95.174 | 5432 | 49152 |
Query routing:
The output is retrieved from one of the designated leader worker nodes, in accordance with the underlying data distribution.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# explain (verbose on) select count(*)from r1 join r2 using (id) where r1.id = 5000; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- --------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM (public.r1_104329 r1(id, t_name) JOIN public.r2_104334 r2(id, t_name) USING (id)) WHE RE (r1.id OPERATOR(pg_catalog.=) 5000) Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=10.16..10.17 rows=1 width=8) Output: count(*) -> Nested Loop (cost=0.29..10.16 rows=1 width=0) -> Index Only Scan using r1_pkey_104329 on public.r1_104329 r1 (cost=0.29..4.30 rows=1 width=8) Output: r1.id Index Cond: (r1.id = 5000) -> Seq Scan on public.r2_104334 r2 (cost=0.00..5.85 rows=1 width=8) Output: r2.id, r2.t_name Filter: (r2.id = 5000) (16 rows) |
3) Co-location tables
If the distributed tables have the same/common distributed column, then we can co-locate them to enable efficient distributed joins and foreign key relationships. This basically means shards with the same hash range reside in the same node always.
- So, below we create two tables having a common distributed column/FK relationship.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
citus=# CREATE TABLE r1 ( id bigint, t_name text, PRIMARY KEY (id) ); CREATE TABLE citus=# CREATE TABLE r2 ( id bigint, t_name text, PRIMARY KEY(ID), FOREIGN KEY (id) REFERENCES r1 (id) ); CREATE TABLE |
- The child table “r2” is distributed in a manner so that it co-locates with the parent table “r1” based on the common distributed column “id”.
1 2 3 4 5 6 7 8 9 |
citus=# SELECT create_distributed_table('r1', 'id'); create_distributed_table -------------------------- (1 row) citus=# SELECT create_distributed_table('r2', 'id', colocate_with := 'r1'); create_distributed_table -------------------------- |
- Then, we performed some DML operations.
1 2 3 4 5 6 7 8 9 |
citus=# INSERT INTO r1 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 citus=# INSERT INTO r2 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
- In the below command output, we can see both tables (“r1” and “r2”) shards placed in the same Leader worker node and its replica. The colocation_id is the same for both.
1 2 3 4 5 6 7 8 9 10 11 12 |
citus=# SELECT get_shard_id_for_distribution_column('r1', 5000); get_shard_id_for_distribution_column -------------------------------------- 104654 (1 row) citus=# SELECT * FROM citus_shards WHERE shardid = 104654; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r1 | 104654 | r1_104654 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r1 | 104654 | r1_104654 | distributed | 38 | 172.31.95.174 | 5432 | 65536 (2 rows) |
1 2 3 4 5 6 7 8 9 10 11 12 |
citus=# SELECT get_shard_id_for_distribution_column('r2', 5000); get_shard_id_for_distribution_column -------------------------------------- 104686 (1 row) citus=# SELECT * FROM citus_shards WHERE shardid = 104686; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r2 | 104686 | r2_104686 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104686 | r2_104686 | distributed | 38 | 172.31.95.174 | 5432 | 65536 (2 rows) |
- Here is the individual shard distribution in more detail.
1 2 3 4 5 6 7 8 9 10 11 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r1'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r1 | 104651 | r1_104651 | distributed | 38 | 172.31.84.95 | 5432 | 73728 r1 | 104651 | r1_104651 | distributed | 38 | 172.31.28.124 | 5432 | 73728 r1 | 104652 | r1_104652 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r1 | 104652 | r1_104652 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r1 | 104653 | r1_104653 | distributed | 38 | 172.31.84.95 | 5432 | 65536 r1 | 104653 | r1_104653 | distributed | 38 | 172.31.28.124 | 5432 | 65536 r1 | 104654 | r1_104654 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r1 | 104654 | r1_104654 | distributed | 38 | 172.31.87.156 | 5432 | 65536 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r2'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r2 | 104683 | r2_104683 | distributed | 38 | 172.31.28.124 | 5432 | 73728 r2 | 104683 | r2_104683 | distributed | 38 | 172.31.84.95 | 5432 | 73728 r2 | 104684 | r2_104684 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104684 | r2_104684 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r2 | 104685 | r2_104685 | distributed | 38 | 172.31.84.95 | 5432 | 65536 r2 | 104685 | r2_104685 | distributed | 38 | 172.31.28.124 | 5432 | 65536 r2 | 104686 | r2_104686 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104686 | r2_104686 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r2 | 104687 | r2_104687 | distributed | 38 | 172.31.28.124 | 5432 | 65536 r2 | 104687 | r2_104687 | distributed | 38 | 172.31.84.95 | 5432 | 65536 r2 | 104688 | r2_104688 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r2 | 104688 | r2_104688 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104689 | r2_104689 | distributed | 38 | 172.31.84.95 | 5432 | 65536 |
Query routing:
The query routing rule has not changed much. Like a reference table, we can just perform multiple table-related queries or join operations.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# explain (verbose on) select count(*)from r1 join r2 using (id) where r1.id = 5000; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- --------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM (public.r1_104654 r1(id, t_name) JOIN public.r2_104686 r2(id, t_name) USING (id)) WHE RE (r1.id OPERATOR(pg_catalog.=) 5000) Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=11.71..11.72 rows=1 width=8) Output: count(*) -> Nested Loop (cost=0.00..11.71 rows=1 width=0) -> Seq Scan on public.r1_104654 r1 (cost=0.00..5.85 rows=1 width=8) Output: r1.id, r1.t_name Filter: (r1.id = 5000) -> Seq Scan on public.r2_104686 r2 (cost=0.00..5.85 rows=1 width=8) Output: r2.id, r2.t_name Filter: (r2.id = 5000) (16 rows) |
Note:- The key difference between standard reference tables and co-location is that, in the former, one of the tables is replicated across all worker nodes, making it locally available. In contrast, with co-location, both tables are distributed across the nodes in such a way that related rows reside on the same worker node.
4) Columnar storage
For analytical or data warehousing needs, we can use the columnar format, where the columns are stored on disk rather than rows in a compressed way. This further helps in Write distribution by distributing the data over backend worker nodes.
- Here, we create a table of columnar type and push some random data.
1 2 3 4 5 |
citus=# CREATE TABLE col_test ( id bigint, data jsonb not null ) USING columnar; |
1 2 3 |
citus=# INSERT INTO col_test (id, data) SELECT d, '{"hello":"columnar"}' FROM generate_series(1,10000000) d; INSERT 0 10000000 |
- We distributed the table based on the “id” column.
1 2 3 4 5 6 7 8 9 |
citus=# SELECT create_distributed_table('col_test', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.col_test$$) create_distributed_table -------------------------- (1 row) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'col_test'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+-----------------+------------------+---------------+---------------+----------+------------ col_test | 104715 | col_test_104715 | distributed | 38 | 172.31.28.124 | 5432 | 516096 col_test | 104715 | col_test_104715 | distributed | 38 | 172.31.84.95 | 5432 | 516096 col_test | 104716 | col_test_104716 | distributed | 38 | 172.31.95.174 | 5432 | 516096 col_test | 104716 | col_test_104716 | distributed | 38 | 172.31.87.156 | 5432 | 516096 col_test | 104717 | col_test_104717 | distributed | 38 | 172.31.28.124 | 5432 | 516096 col_test | 104717 | col_test_104717 | distributed | 38 | 172.31.84.95 | 5432 | 516096 col_test | 104718 | col_test_104718 | distributed | 38 | 172.31.87.156 | 5432 | 516096 col_test | 104718 | col_test_104718 | distributed | 38 | 172.31.95.174 | 5432 | 516096 col_test | 104719 | col_test_104719 | distributed | 38 | 172.31.28.124 | 5432 | 516096 col_test | 104719 | col_test_104719 | distributed | 38 | 172.31.84.95 | 5432 | 516096 col_test | 104720 | col_test_104720 | distributed | 38 | 172.31.87.156 | 5432 | 516096 col_test | 104720 | col_test_104720 | distributed | 38 | 172.31.95.174 | 5432 | 516096 col_test | 104721 | col_test_104721 | distributed | 38 | 172.31.84.95 | 5432 | 516096 |
Query routing:
The query routing process for columnar type will be the same as the other options we explored earlier.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
citus=# SET citus.explain_all_tasks TO on; citus=# explain (verbose on) select count(*)from col_test where col_test.id = 10000; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM public.col_test_104741 col_test WHERE (id OPERATOR(pg_catalog.=) 10000) Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=13.62..13.63 rows=1 width=8) Output: count(*) -> Custom Scan (ColumnarScan) on public.col_test_104741 col_test (cost=0.00..9.71 rows=1564 width=0) Filter: (col_test.id = 10000) Columnar Projected Columns: id Columnar Chunk Group Filters: (id = 10000) (13 rows) |
Read scaling via secondary worker nodes
- By default, Citus allows performing reads from the worker leader nodes only.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# EXPLAIN (VERBOSE ON) SELECT count(*) FROM dist_test; QUERY PLAN ------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) -> Custom Scan (citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Output: remote_scan.count Task Count: 32 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM public.dist_test_102040 dist_test WHERE true Node: host=172.31.28.124 port=5432 dbname=citus -> Aggregate (cost=31.05..31.06 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102040 dist_test (cost=0.00..27.24 rows=1524 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102041 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=31.05..31.06 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102041 dist_test (cost=0.00..27.24 rows=1524 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102042 dist_test WHERE true Node: host=172.31.28.124 port=5432 dbname=citus -> Aggregate (cost=31.05..31.06 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102042 dist_test (cost=0.00..27.24 rows=1524 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102043 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=29.26..29.27 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102043 dist_test (cost=0.00..25.21 rows=1621 width=0) Output: id, t_name |
- However, changing the “citus.use_secondary_nodes” from the default “never” to “always” allows the worker node replicas to serve the read-specific requests.
1 2 3 4 5 |
citus=# SHOW citus.use_secondary_nodes; citus.use_secondary_nodes --------------------------- never (1 row) |
- We can change the read-only setting via direct editing in the Patroni files or dynamically.
1 2 |
parameters: citus.use_secondary_nodes: 'always' |
Or
1 2 |
postgres=# ALTER SYSTEM SET citus.use_secondary_nodes TO 'always'; postgres=# SELECT pg_reload_conf(); |
- Once the changes are applied, we can see the read requests serving via the Standby nodes.
1 2 3 4 5 |
postgres=# SHOW citus.use_secondary_nodes; citus.use_secondary_nodes --------------------------- always (1 row) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# EXPLAIN (VERBOSE ON) SELECT count(*) FROM dist_test; QUERY PLAN ------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) -> Custom Scan (citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Output: remote_scan.count Task Count: 32 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM public.dist_test_102040 dist_test WHERE true Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=28.34..28.35 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102040 dist_test (cost=0.00..24.47 rows=1547 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102041 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=28.33..28.34 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102041 dist_test (cost=0.00..24.46 rows=1546 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102042 dist_test WHERE true Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=28.06..28.07 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102042 dist_test (cost=0.00..24.25 rows=1525 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102043 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=29.26..29.27 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102043 dist_test (cost=0.00..25.21 rows=1621 width=0) Output: id, t_na |
1 2 3 4 5 6 7 8 9 10 |
[root@ip-172-31-89-51 ~]# /usr/local/bin/Patronictl -c /etc/Patroni.yml list citus + citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 9 | | | 1 | Worker1 | 172.31.95.174 | Quorum Standby | streaming | 7 | 0 | | 1 | Worker1Replica | 172.31.87.156 | Leader | running | 7 | | | 2 | Worker2 | 172.31.28.124 | Leader | running | 8 | | | 2 | Worker2 Replica | 172.31.84.95 | Quorum Standby | streaming | 8 | 0 | +-------+-----------------+---------------+----------------+-----------+----+-----------+ |
- But there is one caveat in the above process as every connection will go to read-only replicas which eventually lead to below errors in case any write activity happens.
1 2 3 |
citus=# SELECT create_distributed_table('t5', 'id'); ERROR: cannot execute CREATE TABLE in a read-only transaction CONTEXT: while executing command on 172.31.95.174:5432 |
- The solution to this problem is changing “citus.use_secondary_nodes” back to “never”, but only inside a particular session/client, as below.
1 2 3 4 5 6 7 8 9 10 11 |
bash-5.1$ export PGOPTIONS="-c citus.use_secondary_nodes=never" bash-5.1$ psql -U postgres -d citus Password for user postgres: psql (16.9) Type "help" for help. citus=# SHOW citus.use_secondary_nodes; citus.use_secondary_nodes --------------------------- never (1 row) |
1 2 3 4 5 |
citus=# SELECT create_distributed_table('t5', 'id'); create_distributed_table -------------------------- (1 row) |
Shard balancing/moving
If we want to move a specific shard to a different worker node, we can use the “citus_move_shard_placement()” function.
- Initially, shardid “104756” resides in the Leader Worker node (172.31.95.174).
1 2 3 4 5 |
citus=# SELECT get_shard_id_for_distribution_column('t1', 1000); get_shard_id_for_distribution_column -------------------------------------- 104756 (1 row) |
1 2 3 4 5 6 7 8 9 10 |
citus=# SELECT * FROM citus_shards WHERE shardid = 104756; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard _size ------------+---------+------------+------------------+---------------+---------------+----------+------ ------ t1 | 104756 | t1_104756 | distributed | 39 | 172.31.87.156 | 5432 | 65536 t1 | 104756 | t1_104756 | distributed | 39 | 172.31.95.174 | 5432 | 65536 (2 rows) |
- Then, we moved the shard to another Leader worker node (172.31.84.95).
1 2 3 4 5 |
citus=# SELECT citus_move_shard_placement(104756, '172.31.95.174', 5432, '172.31.84.95', 5432); citus_move_shard_placement ---------------------------- (1 row) |
- Now we can see the shardid “104756” reflected in the different Worker nodes.
1 2 3 4 5 6 7 8 9 10 |
citus=# SELECT * FROM citus_shards WHERE shardid = 104756; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+--------------- +----------+------------ t1 | 104756 | t1_104756 | distributed | 39 | 172.31.28.124 | 5432 | 65536 t1 | 104756 | t1_104756 | distributed | 39 | 172.31.84.95 | 5432 | 65536 (2 rows) |
We can also rebalance the shard in case the data distribution is not evenly distributed and one worker has more shards.
- Here, we can see the leader worker node (172.31.84.95) having more shards than (172.31.95.174).
1 2 3 4 5 6 7 8 |
citus=# select nodename,count(*) from citus_shards group by nodename; nodename | count ---------------+------- 172.31.95.174 | 11 172.31.87.156 | 11 172.31.28.124 | 21 172.31.84.95 | 21 (4 rows) |
- Now we are rebalancing the shard for table “t1”.
1 2 3 4 5 6 7 |
citus=# SELECT rebalance_table_shards('t1'); NOTICE: Moving shard 104747 from 172.31.84.95:5432 to 172.31.95.174:5432 ... NOTICE: Moving shard 104749 from 172.31.84.95:5432 to 172.31.95.174:5432 ... NOTICE: Moving shard 104750 from 172.31.84.95:5432 to 172.31.95.174:5432 ... NOTICE: Moving shard 104751 from 172.31.84.95:5432 to 172.31.95.174:5432 ... rebalance_table_shards ------------------------ |
- The table is rebalanced more closely now.
1 2 3 4 5 6 7 8 |
citus=# select nodename,count(*) from citus_shards group by nodename; nodename | count ---------------+------- 172.31.95.174 | 15 172.31.87.156 | 15 172.31.28.124 | 17 172.31.84.95 | 17 (4 rows) |
Final thought on Citus Distribution Models
This blog post tries to cover key Citus data distribution methods, detailing their functions and query routing. Another feature, “create_distributed_table_concurrently()”, which allows table distribution without downtime, could be beneficial when dealing with an existing table/data set that needs to be distributed over different worker nodes. The usual data distribution using the function “create_distributed_table()” can block the DML until the table sharding process completes. We also demonstrated how to improve read scalability by routing read queries to secondary worker nodes. Finally, we also touched on the shard rebalancing and movement option, both of which play a crucial role in the data stability and performance.
This blog is purely based on my evaluation of the Citus extension and does not reflect Percona’s recommendations/suggestions. Again, I would emphasize that before using any specific distribution model, please carefully assess its pros and cons and evaluate possible support coverage. Testing in a non-production environment is highly recommended to better understand the model and avoid unforeseen issues in production later on.