October 25, 2014

Parallel Query for MySQL with Shard-Query

While Shard-Query can work over multiple nodes, this blog post focuses on using Shard-Query with a single node.  Shard-Query can add parallelism to queries which use partitioned tables.  Very large tables can often be partitioned fairly easily. Shard-Query can leverage partitioning to add paralellism, because each partition can be queried independently. Because MySQL 5.6 supports the partition hint, Shard-Query can add parallelism to any partitioning method (even subpartioning) on 5.6 but it is limited to RANGE/LIST partitioning methods on early versions.

The output from Shard-Query is from the commandline client, but you can use MySQL proxy to communicate with Shard-Query too.

In the examples I am going to use the schema from the Star Schema Benchmark.  I generated data for scale factor 10, which means about 6GB of data in the largest table. I am going to show a few different queries, and explain how Shard-Query executes them in parallel.

Here is the DDL for the lineorder table, which I will use for the demo queries:

Notice that the lineorder table is partitioned by HASH(LO_OrderDateKey) into 8 partitions.  I used 8 partitions and my test box has 4 cores. It does not hurt to have more partitions than cores. A number of partitions that is two or three times the number of cores generally works best because it keeps each partition small, and smaller partitions are faster to scan. If you have a very large table, a larger number of partitions may be acceptable. Shard-Query will submit a query to Gearman for each partition, and the number of Gearman workers controls the parallelism.

The SQL for the first demo is:

Here is the explain from regular MySQL:

 

So it is basically a full table scan. It takes a long time:

 

Shard-Query executes this query differently from MySQL. It sends a query to each partition, in parallel like the following queries:

You will notice that there is one query for each partition.  Those queries will be sent to Gearman and executed in parallel by as many Gearman workers as possible (in this case 4.)  The output of the queries go into a coordinator table, and then another query does a final aggregation.  That query looks like this:

The Shard-Query time:

That isn’t a typo, it really is sub-second compared to minutes in regular MySQL.

This is because Shard-Query uses GROUP BY to answer this query and a  loose index scan of the PRIMARY KEY is possible:

Next another simple query will be tested, first on regular MySQL:

Again, the EXPLAIN shows a full table scan:

Now, Shard-Query can’t do anything special to speed up this query, except to execute it in parallel, similar to the first query:

The aggregation SQL is similar, but this time the aggregate function is changed to SUM to combine the COUNT from each partition:

And the query is quite a bit faster at 140.24 second compared with MySQL’s 248.7 second result:

Finally, I want to look at a more complex query that uses joins and aggregation.

Here is the query on regular MySQL:

Again, Shard-Query splits up the query to run over each partition (I won’t bore you with the details) and it executes the query faster than MySQL, in 343.3 second compared to ~720:

I hope you see how using Shard-Query can speed up queries without using sharding, on just a single server. All you really need to do is add partitioning.

You can get Shard-Query from GitHub at http://github.com/greenlion/swanhart-tools

Please note: Configure and install Shard-Query as normal, but simply use one node and set the column option (the shard column) to “nocolumn” or false, because you are not required to use a shard column if you are not sharding.

About Justin Swanhart

Justin is a Principal Support Engineer on the support team. In the past, he was a trainer at Percona and a consultant. Justin also created and maintains Shard-Query, a middleware tool for sharding and parallel query execution and Flexviews, a tool for materialized views for MySQL. Prior to working at Percona Justin consulted for Proven Scaling, was a backend engineer at Yahoo! and a database administrator at Smule and Gazillion games.

Comments

  1. Patryk Pomykalski says:

    Are the results always consistent?
    From the wiki: “Shard-Query behaves like READ-COMMITTED with respect to each query.” Does it mean the full query or queries on each partition?

  2. The scan of each partition is consistent. You could see changing data if a partition is changed before Shard-Query starts querying it.

  3. john says:

    set up shard-qury and run as simple query (select count(*) from mtrack_log_error) use php run_query, got error, the query was rewrite to (the where clause is after the and)
    Array
    (
    [0] => SELECT COUNT(*) AS expr_1564968823
    FROM mtrack_log_error AS mtrack_log_error AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_1564968823
    FROM mtrack_log_error AS mtrack_log_error AND UNIX_TIMESTAMP(time) >= (1396328400) AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_1564968823
    FROM mtrack_log_error AS mtrack_log_error AND UNIX_TIMESTAMP(time) >= (1398920400) AND UNIX_TIMESTAMP(time) < (1401
    598800) WHERE 1=1 AND 1=1
    )

    the table was partitioned by time

  4. Justin Swanhart says:

    I checked the fix for this problem into git. Grab the latest version at http://github.com/greenlion/swanhart-tools

  5. john says:

    Thanks for the fix, there is no sql syntax error. but still have a problem. when the query is rewrite to
    SELECT COUNT(*) AS count(*)
    FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) explain partitions select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) explain partitions select count(*) from mtrack_log_error where 1=1 AND time < from_unixtime(1396328400) ORDER BY NULL;
    +—-+————-+——————+————+——-+—————+————+———+——+———-+—
    ———————–+
    | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Ex
    tra |
    +—-+————-+——————+————+——-+—————+————+———+——+———-+—
    ———————–+
    | 1 | SIMPLE | mtrack_log_error | p1403 | index | NULL | error_time | 6 | NULL | 88128625 | Us
    ing where; Using index |
    +—-+————-+——————+————+——-+—————+————+———+——+———-+—
    ———————–+

    so it will only scan one partition.

    Thanks

  6. john says:

    No sure why my previous post is a mess, it missed lots of stuff, what I mean is the query;

    select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) < (1396328400) ORDER
    BY NULL;

    should be changed to

    select count(*) from mtrack_log_error where 1=1 AND time < from_unixtime (1396328400) ORDER
    BY NULL;

    so it will scan only one partition instead of all partitions.

    Thanks

  7. john says:

    Also I got below error in the final stage:

    ERRORS RETURNED BY OPERATION:
    Array
    (
    [0] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPD
    ATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
    [1] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (95701615) ON DUPLICATE KEY UPDA
    TE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
    [2] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (0) ON DUPLICATE KEY UPDATE cou
    nt(*)
    =count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
    )
    no query results
    Exec time: 192.84175181389

  8. Justin Swanhart says:

    I think you are having problems because “time” is a function name in mysql, and you are also using it as a column name. MySQL does not pass functions down to the shards, but only columns, and it doesn’t think that time is a column, but a function.

    I will need to think about how to fix this.

  9. john says:

    Do you mean the partition problem? if you change from UNIX_TIMESTAMP(time) < (1396328400) to time [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPDATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]

    Thanks

  10. Justin Swanhart says:

    If the query only accesses one partition due to the WHERE clause, then Shard-Query won’t scan all the partitions as there is no data of interest in the other partitions.

  11. john says:

    The table has three partitions (three months data), when run the query select count(*) from mtrack_log_error, it was rewrite to three queries, like the below:

    select count(*) from mtrack_log_error where 1=1 AND UNIX_TIMESTAMP(time) = (1396328400) and UNIX_TIMESTAMP(time) = (1398920400) and UNIX_TIMESTAMP(time) <(1401598800) ORDER BY NULL;

    for each query, even it will get one month data, but it still can three partitions because it use UNIX_TIMESTAMP(time) < (1396328400) just like you use a function on index column. should be changed to time<from_unixtime (1396328400)

  12. john says:

    Still got below error in the final stage, not sure how to fix it

    ERRORS RETURNED BY OPERATION:
    Array
    (
    [0] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (144897986) ON DUPLICATE KEY UPD
    ATE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
    [1] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (95701615) ON DUPLICATE KEY UPDA
    TE count(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
    [2] => [message:Error while inserting: INSERT INTO aggregation_tmp_5974337 VALUES (0) ON DUPLICATE KEY UPDATE cou
    nt(*)=count(*) + VALUES(count(*)):1317Query execution was interrupted] [node:shard1] [arc:0] [insert_id:0]
    )
    no query results
    Exec time: 192.84175181389

  13. Justin Swanhart says:

    It looks like you are using an old version of Shard-Query. The latest version should print out –verbose like this:
    SQL TO SEND TO SHARDS:
    Array
    (
    [0] => SELECT COUNT(*) AS expr_2828757189
    FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_2828757189
    FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) >= (1396328400) AND UNIX_TIMESTAMP(time) SELECT COUNT(*) AS expr_2828757189
    FROM mtrack_log_error AS mtrack_log_error WHERE 1=1 AND UNIX_TIMESTAMP(time) >= (1398920400) AND UNIX_TIMESTAMP(time) 2
    )
    1 rows returned
    Exec time: 0.019656181335449

    You can get the latest version from http://github.com/greenlion/swanhart-tools

  14. john says:

    I download shard-query from http://github.com/greenlion/swanhart-tools on June 2, not sure if it is the old version or not. I have download it again, do I just need to copy the shard-query to /usr/shar/ or need to redo all the set up after the copy.

    Thanks

Speak Your Mind

*