In my previous blog post, Integrating Citus with Patroni: Sharding and High Availability Together, I explored how to integrate Citus with Patroni and demonstrated how basic table distribution works. In this follow-up post, I will discuss various other Citus distribution models. We will also explore how shard rebalancing and data movement are handled and further leverage Read scaling via targeting secondary worker nodes.
Let’s explore different distribution categories one by one. I added the nodes/cluster details below for ease of understanding.
|
1 2 3 4 5 6 7 8 9 10 |
/usr/local/bin/patronictl -c /etc/patroni.yml list citus + Citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 10 | | | 1 | Worker1 | 172.31.95.174 | Leader | running | 12 | | | 1 | Worker1Replica | 172.31.87.156 | Quorum Standby | streaming | 12 | 0 | | 2 | Worker2 | 172.31.28.124 | Quorum Standby | streaming | 9 | 0 | | 2 | Worker2 Replica | 172.31.84.95 | Leader | running | 9 | | +-------+-----------------+---------------+----------------+-----------+----+-----------+ |
|
1 2 3 4 5 6 7 8 9 10 |
citus=# SELECT * FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shoul dhaveshards --------+---------+---------------+----------+----------+-------------+----------+-----------+-------------+----------------+------ ------------ 1 | 0 | 172.31.89.51 | 5432 | default | t | t | primary | default | t | f 178 | 2 | 172.31.28.124 | 5432 | default | f | t | secondary | default | f | t 1056 | 1 | 172.31.87.156 | 5432 | default | f | t | secondary | default | f | t 152 | 1 | 172.31.95.174 | 5432 | default | t | t | primary | default | t | t 168 | 2 | 172.31.84.95 | 5432 | default | t | t | primary | default | t | t |
As the name suggests, the different “schema” inside the database will be sharded or segregated over different dedicated worker nodes. We don’t need to distribute any tables, as they automatically reside in their “schema” dedicated shard node.
This can be useful when a multi-tenant application or microservices has a dedicated schema for each service. The application needs to switch [search_path] to use different schemas while working.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
citus=# create schema s1; CREATE SCHEMA citus=# create table s1.s1_tbl (id integer primary key, t_name text); CREATE TABLE citus=# INSERT INTO s1.s1_tbl (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 citus=# create table s1.s1_tbl2 (id integer primary key, t_name text); CREATE TABLE citus=# INSERT INTO s1.s1_tbl2 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
|
1 2 3 4 5 6 7 8 9 10 11 12 |
citus=# SELECT citus_schema_distribute('s1'); NOTICE: distributing the schema s1 NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$s1.s1_tbl$$) NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$s1.s1_tbl2$$) citus_schema_distribute ------------------------- |
|
1 2 3 4 5 6 7 8 |
citus=# create schema s2; CREATE SCHEMA citus=# create table s2.s2_tbl (id integer primary key, t_name text); CREATE TABLE citus=# INSERT INTO s2.s2_tbl (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
|
1 2 3 4 5 6 7 8 9 |
citus=# SELECT citus_schema_distribute('s2'); NOTICE: distributing the schema s2 NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$s2.s2_tbl$$) citus_schema_distribute ------------------------- (1 row) |
|
1 2 3 4 5 6 |
citus=# select * from citus_schemas; schema_name | colocation_id | schema_size | schema_owner -------------+---------------+-------------+-------------- s1 | 23 | 1440 kB | postgres s2 | 24 | 720 kB | postgres (2 rows) |
|
1 2 3 4 5 6 7 8 9 10 11 |
citus=# select table_name, shardid, colocation_id, nodename, nodeport, shard_size from citus_shards where citus_table_type = 'schema'; table_name | shardid | colocation_id | nodename | nodeport | shard_size ------------+---------+---------------+---------------+----------+------------ s1.s1_tbl | 104290 | 23 | 172.31.87.156 | 5432 | 737280 s1.s1_tbl | 104290 | 23 | 172.31.95.174 | 5432 | 737280 s1.s1_tbl2 | 104291 | 23 | 172.31.87.156 | 5432 | 737280 s1.s1_tbl2 | 104291 | 23 | 172.31.95.174 | 5432 | 737280 s2.s2_tbl | 104292 | 24 | 172.31.28.124 | 5432 | 737280 s2.s2_tbl | 104292 | 24 | 172.31.84.95 | 5432 | 737280 (6 rows) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
citus=# SET search_path TO s1;; SET citus=# SET citus.explain_all_tasks TO on; SET citus=# explain (verbose on) select count(*) from s1_tbl; QUERY PLAN ----------------------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM s1.s1_tbl_104290 s1_tbl Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=180.00..180.01 rows=1 width=8) Output: count(*) -> Seq Scan on s1.s1_tbl_104290 s1_tbl (cost=0.00..155.00 rows=10000 width=0) Output: id, t_name (11 rows) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
citus=# SET search_path TO s2; SET citus=# explain (verbose on) select count(*) from s2_tbl; QUERY PLAN ----------------------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM s2.s2_tbl_104292 s2_tbl Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=180.00..180.01 rows=1 width=8) Output: count(*) -> Seq Scan on s2.s2_tbl_104292 s2_tbl (cost=0.00..155.00 rows=10000 width=0) Output: id, t_name (11 rows) |
When we have some table dependency for the purpose of foreign key relation or read retrievals, we can, with the help of the function “create_reference_table(),” place the reference table on each worker node so that the JOIN query can be run locally. This might improve the retrieval efficiency instead of relying on row retrievals from remote worker nodes. This process basically replicates the underlying parent table on each worker node.
|
1 2 3 4 5 6 |
citus=# CREATE TABLE r1 ( id bigint, t_name text, PRIMARY KEY (id) ); CREATE TABLE |
|
1 2 3 4 |
citus=# INSERT INTO r1 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
|
1 2 3 4 5 6 7 8 9 |
citus=# CREATE TABLE r2 ( id bigint, t_name text, PRIMARY KEY(ID), FOREIGN KEY (id) REFERENCES r1 (id) ); CREATE TABLE |
|
1 2 3 4 |
citus=# INSERT INTO r2 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
|
1 2 3 4 5 6 7 8 9 10 11 |
citus=# SELECT create_reference_table('r1'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.r1$$) NOTICE: local tables that are added to metadata automatically by citus, but not chained with reference tables via foreign keys might be automatically converted back to postgres tables HINT: Executing citus_add_local_table_to_metadata($$public.r2$$) prevents this for the given relation, and all of the connected relations create_reference_table ------------------------ (1 row) |
|
1 2 3 4 5 6 7 8 9 |
citus=# SELECT create_distributed_table('r2', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.r2$$) create_distributed_table -------------------------- (1 row) |
|
1 2 3 4 5 6 7 8 9 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r1'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r1 | 104329 | r1_104329 | reference | 27 | 172.31.28.124 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.84.95 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.89.51 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.87.156 | 5432 | 811008 r1 | 104329 | r1_104329 | reference | 27 | 172.31.95.174 | 5432 | 811008 (5 rows) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r2'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r2 | 104331 | r2_104331 | distributed | 28 | 172.31.84.95 | 5432 | 49152 r2 | 104331 | r2_104331 | distributed | 28 | 172.31.28.124 | 5432 | 49152 r2 | 104332 | r2_104332 | distributed | 28 | 172.31.95.174 | 5432 | 40960 r2 | 104332 | r2_104332 | distributed | 28 | 172.31.87.156 | 5432 | 40960 r2 | 104333 | r2_104333 | distributed | 28 | 172.31.84.95 | 5432 | 40960 r2 | 104333 | r2_104333 | distributed | 28 | 172.31.28.124 | 5432 | 40960 r2 | 104334 | r2_104334 | distributed | 28 | 172.31.95.174 | 5432 | 40960 r2 | 104334 | r2_104334 | distributed | 28 | 172.31.87.156 | 5432 | 40960 r2 | 104335 | r2_104335 | distributed | 28 | 172.31.84.95 | 5432 | 40960 r2 | 104335 | r2_104335 | distributed | 28 | 172.31.28.124 | 5432 | 40960 r2 | 104336 | r2_104336 | distributed | 28 | 172.31.87.156 | 5432 | 40960 r2 | 104336 | r2_104336 | distributed | 28 | 172.31.95.174 | 5432 | 40960 r2 | 104337 | r2_104337 | distributed | 28 | 172.31.28.124 | 5432 | 40960 r2 | 104337 | r2_104337 | distributed | 28 | 172.31.84.95 | 5432 | 40960 r2 | 104338 | r2_104338 | distributed | 28 | 172.31.87.156 | 5432 | 49152 r2 | 104338 | r2_104338 | distributed | 28 | 172.31.95.174 | 5432 | 49152 r2 | 104339 | r2_104339 | distributed | 28 | 172.31.84.95 | 5432 | 49152 r2 | 104339 | r2_104339 | distributed | 28 | 172.31.28.124 | 5432 | 49152 r2 | 104340 | r2_104340 | distributed | 28 | 172.31.87.156 | 5432 | 49152 r2 | 104340 | r2_104340 | distributed | 28 | 172.31.95.174 | 5432 | 49152 |
The output is retrieved from one of the designated leader worker nodes, in accordance with the underlying data distribution.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# explain (verbose on) select count(*)from r1 join r2 using (id) where r1.id = 5000; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- --------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM (public.r1_104329 r1(id, t_name) JOIN public.r2_104334 r2(id, t_name) USING (id)) WHE RE (r1.id OPERATOR(pg_catalog.=) 5000) Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=10.16..10.17 rows=1 width=8) Output: count(*) -> Nested Loop (cost=0.29..10.16 rows=1 width=0) -> Index Only Scan using r1_pkey_104329 on public.r1_104329 r1 (cost=0.29..4.30 rows=1 width=8) Output: r1.id Index Cond: (r1.id = 5000) -> Seq Scan on public.r2_104334 r2 (cost=0.00..5.85 rows=1 width=8) Output: r2.id, r2.t_name Filter: (r2.id = 5000) (16 rows) |
If the distributed tables have the same/common distributed column, then we can co-locate them to enable efficient distributed joins and foreign key relationships. This basically means shards with the same hash range reside in the same node always.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
citus=# CREATE TABLE r1 ( id bigint, t_name text, PRIMARY KEY (id) ); CREATE TABLE citus=# CREATE TABLE r2 ( id bigint, t_name text, PRIMARY KEY(ID), FOREIGN KEY (id) REFERENCES r1 (id) ); CREATE TABLE |
|
1 2 3 4 5 6 7 8 9 |
citus=# SELECT create_distributed_table('r1', 'id'); create_distributed_table -------------------------- (1 row) citus=# SELECT create_distributed_table('r2', 'id', colocate_with := 'r1'); create_distributed_table -------------------------- |
|
1 2 3 4 5 6 7 8 9 |
citus=# INSERT INTO r1 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 citus=# INSERT INTO r2 (id, t_name) SELECT i, 'name_' || i FROM generate_series(1, 10000) AS i; INSERT 0 10000 |
|
1 2 3 4 5 6 7 8 9 10 11 12 |
citus=# SELECT get_shard_id_for_distribution_column('r1', 5000); get_shard_id_for_distribution_column -------------------------------------- 104654 (1 row) citus=# SELECT * FROM citus_shards WHERE shardid = 104654; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r1 | 104654 | r1_104654 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r1 | 104654 | r1_104654 | distributed | 38 | 172.31.95.174 | 5432 | 65536 (2 rows) |
|
1 2 3 4 5 6 7 8 9 10 11 12 |
citus=# SELECT get_shard_id_for_distribution_column('r2', 5000); get_shard_id_for_distribution_column -------------------------------------- 104686 (1 row) citus=# SELECT * FROM citus_shards WHERE shardid = 104686; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r2 | 104686 | r2_104686 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104686 | r2_104686 | distributed | 38 | 172.31.95.174 | 5432 | 65536 (2 rows) |
|
1 2 3 4 5 6 7 8 9 10 11 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r1'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r1 | 104651 | r1_104651 | distributed | 38 | 172.31.84.95 | 5432 | 73728 r1 | 104651 | r1_104651 | distributed | 38 | 172.31.28.124 | 5432 | 73728 r1 | 104652 | r1_104652 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r1 | 104652 | r1_104652 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r1 | 104653 | r1_104653 | distributed | 38 | 172.31.84.95 | 5432 | 65536 r1 | 104653 | r1_104653 | distributed | 38 | 172.31.28.124 | 5432 | 65536 r1 | 104654 | r1_104654 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r1 | 104654 | r1_104654 | distributed | 38 | 172.31.87.156 | 5432 | 65536 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'r2'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+---------------+----------+------------ r2 | 104683 | r2_104683 | distributed | 38 | 172.31.28.124 | 5432 | 73728 r2 | 104683 | r2_104683 | distributed | 38 | 172.31.84.95 | 5432 | 73728 r2 | 104684 | r2_104684 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104684 | r2_104684 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r2 | 104685 | r2_104685 | distributed | 38 | 172.31.84.95 | 5432 | 65536 r2 | 104685 | r2_104685 | distributed | 38 | 172.31.28.124 | 5432 | 65536 r2 | 104686 | r2_104686 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104686 | r2_104686 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r2 | 104687 | r2_104687 | distributed | 38 | 172.31.28.124 | 5432 | 65536 r2 | 104687 | r2_104687 | distributed | 38 | 172.31.84.95 | 5432 | 65536 r2 | 104688 | r2_104688 | distributed | 38 | 172.31.95.174 | 5432 | 65536 r2 | 104688 | r2_104688 | distributed | 38 | 172.31.87.156 | 5432 | 65536 r2 | 104689 | r2_104689 | distributed | 38 | 172.31.84.95 | 5432 | 65536 |
The query routing rule has not changed much. Like a reference table, we can just perform multiple table-related queries or join operations.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# explain (verbose on) select count(*)from r1 join r2 using (id) where r1.id = 5000; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- --------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM (public.r1_104654 r1(id, t_name) JOIN public.r2_104686 r2(id, t_name) USING (id)) WHE RE (r1.id OPERATOR(pg_catalog.=) 5000) Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=11.71..11.72 rows=1 width=8) Output: count(*) -> Nested Loop (cost=0.00..11.71 rows=1 width=0) -> Seq Scan on public.r1_104654 r1 (cost=0.00..5.85 rows=1 width=8) Output: r1.id, r1.t_name Filter: (r1.id = 5000) -> Seq Scan on public.r2_104686 r2 (cost=0.00..5.85 rows=1 width=8) Output: r2.id, r2.t_name Filter: (r2.id = 5000) (16 rows) |
Note:- The key difference between standard reference tables and co-location is that, in the former, one of the tables is replicated across all worker nodes, making it locally available. In contrast, with co-location, both tables are distributed across the nodes in such a way that related rows reside on the same worker node.
For analytical or data warehousing needs, we can use the columnar format, where the columns are stored on disk rather than rows in a compressed way. This further helps in Write distribution by distributing the data over backend worker nodes.
|
1 2 3 4 5 |
citus=# CREATE TABLE col_test ( id bigint, data jsonb not null ) USING columnar; |
|
1 2 3 |
citus=# INSERT INTO col_test (id, data) SELECT d, '{"hello":"columnar"}' FROM generate_series(1,10000000) d; INSERT 0 10000000 |
|
1 2 3 4 5 6 7 8 9 |
citus=# SELECT create_distributed_table('col_test', 'id'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.col_test$$) create_distributed_table -------------------------- (1 row) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
citus=# SELECT * FROM citus_shards WHERE table_name = 'col_test'::regclass; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+-----------------+------------------+---------------+---------------+----------+------------ col_test | 104715 | col_test_104715 | distributed | 38 | 172.31.28.124 | 5432 | 516096 col_test | 104715 | col_test_104715 | distributed | 38 | 172.31.84.95 | 5432 | 516096 col_test | 104716 | col_test_104716 | distributed | 38 | 172.31.95.174 | 5432 | 516096 col_test | 104716 | col_test_104716 | distributed | 38 | 172.31.87.156 | 5432 | 516096 col_test | 104717 | col_test_104717 | distributed | 38 | 172.31.28.124 | 5432 | 516096 col_test | 104717 | col_test_104717 | distributed | 38 | 172.31.84.95 | 5432 | 516096 col_test | 104718 | col_test_104718 | distributed | 38 | 172.31.87.156 | 5432 | 516096 col_test | 104718 | col_test_104718 | distributed | 38 | 172.31.95.174 | 5432 | 516096 col_test | 104719 | col_test_104719 | distributed | 38 | 172.31.28.124 | 5432 | 516096 col_test | 104719 | col_test_104719 | distributed | 38 | 172.31.84.95 | 5432 | 516096 col_test | 104720 | col_test_104720 | distributed | 38 | 172.31.87.156 | 5432 | 516096 col_test | 104720 | col_test_104720 | distributed | 38 | 172.31.95.174 | 5432 | 516096 col_test | 104721 | col_test_104721 | distributed | 38 | 172.31.84.95 | 5432 | 516096 |
The query routing process for columnar type will be the same as the other options we explored earlier.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
citus=# SET citus.explain_all_tasks TO on; citus=# explain (verbose on) select count(*)from col_test where col_test.id = 10000; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Output: remote_scan.count Task Count: 1 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM public.col_test_104741 col_test WHERE (id OPERATOR(pg_catalog.=) 10000) Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=13.62..13.63 rows=1 width=8) Output: count(*) -> Custom Scan (ColumnarScan) on public.col_test_104741 col_test (cost=0.00..9.71 rows=1564 width=0) Filter: (col_test.id = 10000) Columnar Projected Columns: id Columnar Chunk Group Filters: (id = 10000) (13 rows) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# EXPLAIN (VERBOSE ON) SELECT count(*) FROM dist_test; QUERY PLAN ------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) -> Custom Scan (citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Output: remote_scan.count Task Count: 32 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM public.dist_test_102040 dist_test WHERE true Node: host=172.31.28.124 port=5432 dbname=citus -> Aggregate (cost=31.05..31.06 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102040 dist_test (cost=0.00..27.24 rows=1524 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102041 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=31.05..31.06 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102041 dist_test (cost=0.00..27.24 rows=1524 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102042 dist_test WHERE true Node: host=172.31.28.124 port=5432 dbname=citus -> Aggregate (cost=31.05..31.06 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102042 dist_test (cost=0.00..27.24 rows=1524 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102043 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=29.26..29.27 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102043 dist_test (cost=0.00..25.21 rows=1621 width=0) Output: id, t_name |
|
1 2 3 4 5 |
citus=# SHOW citus.use_secondary_nodes; citus.use_secondary_nodes --------------------------- never (1 row) |
|
1 2 |
parameters: citus.use_secondary_nodes: 'always' |
Or
|
1 2 |
postgres=# ALTER SYSTEM SET citus.use_secondary_nodes TO 'always'; postgres=# SELECT pg_reload_conf(); |
|
1 2 3 4 5 |
postgres=# SHOW citus.use_secondary_nodes; citus.use_secondary_nodes --------------------------- always (1 row) |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
citus=# SET citus.explain_all_tasks TO on; SET citus=# EXPLAIN (VERBOSE ON) SELECT count(*) FROM dist_test; QUERY PLAN ------------------------------------------------------------------------------------------------------------- Aggregate (cost=250.00..250.02 rows=1 width=8) Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) -> Custom Scan (citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Output: remote_scan.count Task Count: 32 Tasks Shown: All -> Task Query: SELECT count(*) AS count FROM public.dist_test_102040 dist_test WHERE true Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=28.34..28.35 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102040 dist_test (cost=0.00..24.47 rows=1547 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102041 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=28.33..28.34 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102041 dist_test (cost=0.00..24.46 rows=1546 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102042 dist_test WHERE true Node: host=172.31.84.95 port=5432 dbname=citus -> Aggregate (cost=28.06..28.07 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102042 dist_test (cost=0.00..24.25 rows=1525 width=0) Output: id, t_name -> Task Query: SELECT count(*) AS count FROM public.dist_test_102043 dist_test WHERE true Node: host=172.31.95.174 port=5432 dbname=citus -> Aggregate (cost=29.26..29.27 rows=1 width=8) Output: count(*) -> Seq Scan on public.dist_test_102043 dist_test (cost=0.00..25.21 rows=1621 width=0) Output: id, t_na |
|
1 2 3 4 5 6 7 8 9 10 |
[root@ip-172-31-89-51 ~]# /usr/local/bin/Patronictl -c /etc/Patroni.yml list citus + citus cluster: citus ---+---------------+----------------+-----------+----+-----------+ | Group | Member | Host | Role | State | TL | Lag in MB | +-------+-----------------+---------------+----------------+-----------+----+-----------+ | 0 | Coordinator | 172.31.89.51 | Leader | running | 9 | | | 1 | Worker1 | 172.31.95.174 | Quorum Standby | streaming | 7 | 0 | | 1 | Worker1Replica | 172.31.87.156 | Leader | running | 7 | | | 2 | Worker2 | 172.31.28.124 | Leader | running | 8 | | | 2 | Worker2 Replica | 172.31.84.95 | Quorum Standby | streaming | 8 | 0 | +-------+-----------------+---------------+----------------+-----------+----+-----------+ |
|
1 2 3 |
citus=# SELECT create_distributed_table('t5', 'id'); ERROR: cannot execute CREATE TABLE in a read-only transaction CONTEXT: while executing command on 172.31.95.174:5432 |
|
1 2 3 4 5 6 7 8 9 10 11 |
bash-5.1$ export PGOPTIONS="-c citus.use_secondary_nodes=never" bash-5.1$ psql -U postgres -d citus Password for user postgres: psql (16.9) Type "help" for help. citus=# SHOW citus.use_secondary_nodes; citus.use_secondary_nodes --------------------------- never (1 row) |
|
1 2 3 4 5 |
citus=# SELECT create_distributed_table('t5', 'id'); create_distributed_table -------------------------- (1 row) |
If we want to move a specific shard to a different worker node, we can use the “citus_move_shard_placement()” function.
|
1 2 3 4 5 |
citus=# SELECT get_shard_id_for_distribution_column('t1', 1000); get_shard_id_for_distribution_column -------------------------------------- 104756 (1 row) |
|
1 2 3 4 5 6 7 8 9 10 |
citus=# SELECT * FROM citus_shards WHERE shardid = 104756; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard _size ------------+---------+------------+------------------+---------------+---------------+----------+------ ------ t1 | 104756 | t1_104756 | distributed | 39 | 172.31.87.156 | 5432 | 65536 t1 | 104756 | t1_104756 | distributed | 39 | 172.31.95.174 | 5432 | 65536 (2 rows) |
|
1 2 3 4 5 |
citus=# SELECT citus_move_shard_placement(104756, '172.31.95.174', 5432, '172.31.84.95', 5432); citus_move_shard_placement ---------------------------- (1 row) |
|
1 2 3 4 5 6 7 8 9 10 |
citus=# SELECT * FROM citus_shards WHERE shardid = 104756; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ------------+---------+------------+------------------+---------------+--------------- +----------+------------ t1 | 104756 | t1_104756 | distributed | 39 | 172.31.28.124 | 5432 | 65536 t1 | 104756 | t1_104756 | distributed | 39 | 172.31.84.95 | 5432 | 65536 (2 rows) |
We can also rebalance the shard in case the data distribution is not evenly distributed and one worker has more shards.
|
1 2 3 4 5 6 7 8 |
citus=# select nodename,count(*) from citus_shards group by nodename; nodename | count ---------------+------- 172.31.95.174 | 11 172.31.87.156 | 11 172.31.28.124 | 21 172.31.84.95 | 21 (4 rows) |
|
1 2 3 4 5 6 7 |
citus=# SELECT rebalance_table_shards('t1'); NOTICE: Moving shard 104747 from 172.31.84.95:5432 to 172.31.95.174:5432 ... NOTICE: Moving shard 104749 from 172.31.84.95:5432 to 172.31.95.174:5432 ... NOTICE: Moving shard 104750 from 172.31.84.95:5432 to 172.31.95.174:5432 ... NOTICE: Moving shard 104751 from 172.31.84.95:5432 to 172.31.95.174:5432 ... rebalance_table_shards ------------------------ |
|
1 2 3 4 5 6 7 8 |
citus=# select nodename,count(*) from citus_shards group by nodename; nodename | count ---------------+------- 172.31.95.174 | 15 172.31.87.156 | 15 172.31.28.124 | 17 172.31.84.95 | 17 (4 rows) |
This blog post tries to cover key Citus data distribution methods, detailing their functions and query routing. Another feature, “create_distributed_table_concurrently()”, which allows table distribution without downtime, could be beneficial when dealing with an existing table/data set that needs to be distributed over different worker nodes. The usual data distribution using the function “create_distributed_table()” can block the DML until the table sharding process completes. We also demonstrated how to improve read scalability by routing read queries to secondary worker nodes. Finally, we also touched on the shard rebalancing and movement option, both of which play a crucial role in the data stability and performance.
This blog is purely based on my evaluation of the Citus extension and does not reflect Percona’s recommendations/suggestions. Again, I would emphasize that before using any specific distribution model, please carefully assess its pros and cons and evaluate possible support coverage. Testing in a non-production environment is highly recommended to better understand the model and avoid unforeseen issues in production later on.