# Distributed Set Processing with Shard-Query

## Can Shard-Query scale to 20 nodes?

Peter asked this question in comments to to my previous Shard-Query benchmark. Actually he asked if it could scale to 50, but testing 20 was all I could due to to EC2 and time limits. I think the results at 20 nodes are very useful to understand the performance:

I will shortly release another blog post focusing on ICE performance at 20 nodes, but before that, I want to give a quick preview, then explain exactly how Shard-Query works.

Yes, Shard-Query scales very well at 20 nodes

## Distributed set processing (theory)

### What is SQL?

As you probably know, SQL stands for “structured query language”. It isn’t so much the language that is structured, but actually the data. Every SQL statements breaks down into a relational algebra equation. In Algebra you learn that some operations are “distributable”, that is, you can split them up into smaller units, and when you put those units back together the result is the same as if you didn’t take it apart. Projection, GROUP BY, SUM, COUNT, MIN*, and MAX* are distributable. With a little elbow grease, all non-distributable aggregate functions (AVG,STDDEV,VARIANCE,etc) can be decomposed into distributable functions using simple substitution rules.

So, to recap, every SQL query is really a cleverly disguised relational algebra mathematical expression. With relational algebraic substitution, every aggregate expression, even non-distributable ones, can be broken into distributable sub-expressions.

### What is a result set?

This isn’t really a trick question. The “result set” is a SET created by the output of a relational algebra expresion. Relational algebra expressions always produce sets as output. Just to drive it home, SQL is relational algebra, and this algebra only operates on sets. The next important thing to understand about SQL is that it is declarative. That is, you tell the database what you want but not how to get it. Most distributed engines work with rows. They break the queries up into the lowest level sets possible (rows) which doesn’t makes much sense to me, since SQL is set oriented. I just said repeatedly that SQL doesn’t work on rows! Why would you break this paradigm by passing around rows instead of sets? From a distributed processing standpoint, rows are the worst case for performance. Optimal mathematical performance requires operations on reduced sets. Keep in mind, that rows based systems work well but still, these systems are much farther from optimal than working directly with relational algebra.

### Materialized views techniques applied to distributed computation

I maintain another open source tool called Flexviews which supports incrementally maintaining materialized views. While writing Flexviews, I learned how to distribute the computation of aggregation queries over time. I realised that I could apply these same mathematical concepts to distributed queries as well, but with some minor differences.

Having written Flexviews, I understood that there are special materialized view optimizations that can be applied to INSERT-only workloads, and their are other optimizations that can be applied to views that are based on only a single base table. Knowing this, the query result set is treated as a materialized view over a union of all the already joined and aggregated data from all the nodes. A single temporary table is used to store the results from all the nodes. Since we are projecting results, this is naturally an INSERT-only workload. The insertions into the base table from each node correspond logically to a records in a Flexviews materialized view delta table and thus the logic for applying changes via ON DUPLICATE KEY UPDATE is the same. All of the fastest incremental materialized view optimizations can be applied.

### Shard-Query works only on sets

Shard-Query takes relational algebra to its logical maximum conclusion, splitting a problem up into many small problems and then putting the results back together again. This set logic allows to multiple levels of reduction of the set, all in parallel. All joins, aggregation and filtering is done at the storage node level. This means that Shard-Query does not have to have any idea of the structure of the data on which your queries operate. It can, however, use a mapper for partition elimination. The mapper is pluggable.

On each storage node (or a mapped subset of nodes) the result set is aggregated using distributable aggregate functions. The results from this aggregation get a second distributed reduce using UPSERTs into the coordination node when maintaining the “materialized view” of the results. Finally, a single threaded final group-by reduction is run over the coordination node, and this projects the final result set.

### Intern-node communication

One or more gearman queues are used to compute work. Computation is massively parallel. Shard-Query simply waits for all the queries to finish, then projects the synchronously maintained incrementally refreshable materialized view that represents the output. There is no external locking or synchronization required. If a query fails on a storage node, then it can be retried on the same node, or in the future, a redundant node.

## Work on problems of any size.

### Set processing is massively parallel

.
In fact, it is embarassingly parallel. Because Shard-Query works on sets, and features pluggable partition mapping, it allows partitioning resources to any depth and distributing a query over that set of resources. Lets say you have a database system that is at capacity in data center A, and there is not enough power to add new nodes there. You can add new nodes in data center B and distribute the queries over both data centers. The response time should only be increased by the average latency, since work is done in parallel and there is very little data shipping because of the distributed reduction discussed above. If you have any limitation in resources in a cluster (cpu, memory, disk, power,etc) then split the problem up into more chunks.

Each Shard-Query instance is a essentially a proxy for the distributed set based SQL operations executed on the nodes under the proxy. The databases do 99.9% of the work, due to the multiple levels of result set reduction. Finally, Shard-Query can automatically partition sets into subsets using basic relational algebra substitution rules that are documented in this blog post. This allows BETWEEN, IN, subqueries in the FROM clause, and UNION operations to operate fully in parallel.

### Distributed set processing is database agnostic.

Keep in mind, at each level of partitioning only a list of servers and some credentials are required to make this work. It can even be set up without the aid of your database administrator.

As long as all of the nodes share a common schema model and share a common SQL dialect, then they can all participate in the distributed query processing. There is almost no data shipping, as only aggregated sets are sent over the network. In the future you will be able to distribute work over any type of compute resource which speaks SQL, but right now only MySQL storage nodes are supported. Amdahl’s law applies to the distributed processing. The results from the slowest node will place a lower bound on the minimum performance level.

Soon you will be able to set up computation over ICE, InnoDB, Vectorwise and other databases, all at the same time, and transparently to the source query. Adding a new storage node SQL dialect is almost trivial. I’ll document that process shortly, I think I need to make some minor abstraction modifications in the data access layer.

### Shard-Query can provide query execution plans based on the relational algebra rewrites

So, here is an example of such a plan for a simple query with aggregation, a JOIN, and a WHERE clause that uses an IN clause:

Edit

*MIN/MAX are only distributable in INSERT-only workloads

See this information about maintaining materialized views:

1. Justin Swanhart says

Eventually a proxy that fronts the framework would allow enterprise apps to interact with it as if it were a regular MySQL database server. This will allow Mondrian, etc, to work with it transparently as well.

2. Justin Swanhart says

3. Hung Phan says

Good job, good start! Taking your work and idea (with ICE) to another level with the distributed GlusterFS (HA and rebalancing) and MPI on Infiniband. It will get embarassingly (parallel) fast

Justin, of course, there is a large class of OLAP applications which can be reimplemented using Shard-Query framework. This is a potentially great approach to building low-cost clustered analytical databases. There are some limitations (like no support for constellation type of schema) but many companies can live with these limitations. Just one comment: Shard-Query is quite difficult to integrate into regular enterprise application where PHP is a third class citizen in a best case (frankly speaking, I have never heard of PHP being used in an enterprise applications).

5. Justin Swanhart says

Regardless, Shard-Query suits the class of problems that I want to target. I want to be able to provide fast SQL based analytics on a star schema for ROLAP based processing while spreading the work over whatever database technology is available. In the case where a dimension would be too long, simply make it a degenerate dimension, or fully denormalize for ultimate performance.

6. Justin Swanhart says

There are more efficient optimizations for the non-distributable aggregates, but one can only work on so many things before real-world testing.

7. Justin Swanhart says

Pushing into the group by would normally be a problem, but the incremental maintenance of the #tmp table addresses that issue (the UPSERT).

8. Justin Swanhart says

percentile can use GROUP_CONCAT on mysql. Flexviews has native support for asynchronous maintenance of materialized views with all aggregate fuctions + count(distinct) and Shard-Query can distribute count(distinct) over a cluster. My benchmark tests this extensively if you take a look at the queries.

9. Justin Swanhart says

COUNT(distinct), percentile and median can be decomposed into distributable forms.

MEAN can be turned into SUM/COUNT
STDDEV, etc can be pushed into the GROUP BY at the shard level, and then the collected set from all the shards has a valid list of distinct values to evaluate.

10. J. Andrew Rogers says

Sharding with value ranges is essentially a distributed version of B+Trees and has the same moderators on concurrency. Massively parallel databases that support both hash partitioning and range partitioning always warn that the latter will limit scale-out. It works okay at modest levels of distribution; if the database is read-only then this obviously does not matter much outside of load time.

As you point out, you can materialize joins by sharding the hierarchy at each level so that locality is preserved. That is roughly what a document database does and some commercial databases (e.g. Oracle) have supported this optimization since the 1990s. The problem is that this structure makes queries that do not strictly adhere to that hierarchical traversal very inefficient. Obviously this works for some cases (see: MongoDB).

The subtle point on joins is that join algorithms are embarrassingly parallel on some network topologies, just not on a switch fabric topology. Even if the operations shard, the network won’t. It is the reason that graph databases, which are built on selection and equi-join operators, are not usefully shardable even though they seem like they should be.

What you are doing will work up to a point, your technique has a lot of history. It is how parallel databases like Teradata work under the hood. I’m not trying to give you a hard time but the devil is in the details when it comes to MPP database engines. The first lesson everyone (myself included) learns the hard way because it is not intuitive is that some embarrassingly parallel algorithms are not parallel on a switch fabric. This turns out to be a major limitation (not surprisingly, very familiar to the HPC software community).

Your technique can efficiently scale anything that will efficiently scale with MapReduce so there is clear value. I prefer your way to be honest.

11. Justin Swanhart says

Flexviews allows you to efficiently maintain the aggregate tables over time based only what changed in the database. A sufficiently sharded database will have a small amount of data in each node, and you can make sure this is so by using multiple tiers.

If you want to replicate data between nodes, Tungsten offers external replication features for most databases.

12. Justin Swanhart says

Vlad yes. You can use map-reduce on top this this if you want to, or you can just fully denormalize. All the “dimensions” will just end up being built as aggregate tables and mondrian can use those transparently anyway.

Justin,

As far as I understood, all dimension tables in your test are replicated among all nodes. Otherwise, joining fact table (partitioned) with another partitioned table will require totally different technique which is hardly implemented in Shard-Query. Am I right?

“With relational algebraic substitution, every aggregate expression, even non-distributable ones, can be broken into distributable sub-expressions”

This is true only for analytical aggregates and does not hold on holistic ones (i.e. median, distinct, percentiles).

15. Justin Swanhart says

Logically, Shard-Query turns any turing compute cluster into an SIMD cluster for relational, and possibly general computing purposes. The computation engine is distributed and the SQL is treated as a program which is run on the virtualized SIMD system. The system is data flow oriented for parallelism.

16. Justin Swanhart says

Last comment for now:
Because each tier is trivial to set up, and because hash partitions support range conditions, it is easiest just to hash the data instead of relying on directory based lookups. If you do want to use a directory service, then directory service lookups can be done in parallel by fronting the directory with shard-query and there wont’ be much performance drop. This allows you to be able to add nodes in real time to any level, and you can also split nodes. Both modes will have to be tested to see which performs better.

17. Justin Swanhart says

My method uses only UNION operations, and those operations happen only on small amounts of data because the data is pre-aggregated before the union operation. The partition elimination is just an extra N levels of map before the distributed reduce in the aggregation operations. In addition, the UNION aggregation operations happen in parallel using UPSERT. There is a unique key over the unique attributes of the union.

18. Justin Swanhart says

The best part is, that because the system is database and data set agnostic, the layers don’t even know they are layers.

Oh, and the materialized views are database agnostic too, just somewhat tightly coupled with MySQL at the moment.

19. Justin Swanhart says

And you can still partition an extra two levels deep after the sharding using the RDBMS and whatever scaling technology it uses. You can front N Greenplum or Exadata clusters with Shard-Query, N levels deep. When partition elimination isn’t used, the entire cluster is accessed in parallel. The system is embarrassingly parallel. It also allows you to add a form of “logical partitioning”. Shard-Query can convert BETWEEN -> IN and IN -> parallel equality lookups. You can push a between clause into every query that forces a range which always evaluates to true like “date_id between 1 and 8401″. This also allows effective range scans on hash indexes since the range is treated as a union of equalities (think in sets).

If an IN list is greater than –inlist-merge-threshold items (default 128) then the inlist will be split into many IN lists, each of –inlist-merge-size size. This allows you to add extra parallelism when the dataset fits in memory. I believe this will greatly improve performance when I test vectorwise.

You can also put materialized views on every database. I support ALL aggregation functions, and Shard-Query will soon be able to add aggregate tables and do query rewrites automatically. Once you run a query it will literally never be slow again because I can maintain the views behind the scenes efficiently (fast refresh). Since I can support efficient refresh, I can support insert only workloads if delta records are inserted into the database.

My math says this can scale to any size. Please prove me wrong.

20. Justin Swanhart says

This benchmark does not test partition elimination. A sharded system is no different from a RDBMS partitioned system in terms of this algorithm. Partition elimination can improve performance by orders of magnitude over these results. Unlike RDBMS partitioning which can only shard by two levels (partition by range|list, subpartition by hash) Shard-Query can subpartition to any N levels, and every level can have hundreds of machines.

21. Justin Swanhart says

For what set of SQL problems will my method not work on?

Assuming you are SaaS and shard by customer, then by date, where does the join performance break down? With my system I can still aggregate across all customers, and I can eliminate vast swaths of data and only join tiny amounts when I look at data for a range of dates for a specific customer.

22. J. Andrew Rogers says

You are correct, strictly speaking. Join operators are set oriented but are derived more from category theory (of sets) than set theory per se. They define sets of relationships rather than sets of entities.

To be clear, join algorithms can be sharded and in one uselessly narrow case they will even scale linearly. However, most sharded joins built from set algebras and logic have a strong sublinear scaling character even though they shard easily and, as you note above, self-joins are so sublinear that there is no value in sharding the algorithms at all. I was speaking more to practical limitations on sharding joins.

(There are unpublished algorithms that will shard a self-join with linear scalability — I’ve seen two — but those relational algebras are built from topology theory rather than the more obvious set theory.)

23. Justin Swanhart says

schema:
create table t1
( c1 int primary key,
c2 int
);

create table t2
( id int primary key,
t1_c1 int
);

–given the query
select c1 expr1, sum(c2) expr2, count(*) expr3
from t1
join t2
where t2.t1_c1 = t1.c1
group by c1;

–Each of the following runs concurrently. When each query completes, the results will be
–sent to a reduction table (with incremental materialization optimizations) we’ll call #tmp
–to node1:
select c1 as expr1, sum(c2) as expr, sum(1) as expr3
from t1
join t2
where t2.t1_c1 = t1.c1
group by 1;

–to node2 (note the query is identical):
select c1 as expr1, sum(c2) as expr2, sum(1) as expr3
from t1
join t2
where t2.t1_c1 = t1.c1
group by 1;

–When they all complete, the results of this query are sent to user:
select expr1, sum(expr2) expr2, sum(expr3) expr3
from #tmp
group by 1;

–Clean up
drop table #tmp;

24. Justin Swanhart says

If you are thinking about joins, then you aren’t thinking in sets.

25. Justin Swanhart says

My approach does elegantly distribute the join over all the nodes as long as the data is sharded (that is, shared nothing). You can’t do a self join across partitions.

26. J. Andrew Rogers says

While I agree that database operations can (and should) be decomposed into an elegantly parallelizable relational algebra, the major limitation is that relational join operations are not efficiently distributable which limits generality of sharding approaches to databases.

Join algorithms are canonical examples of the difference between data parallel algorithms (i.e. algorithms that only scale on a network where every node is directly connected to every other node) and disjoint-access parallel algorithms (i.e. algorithms that scale on a network where every node is connected to a non-blocking switch fabric). Standard join algorithms only distribute efficiently in the directly connected topology model but all of our clusters tend to be switch fabrics, hence why joins are a perennial scaling problem for parallel databases.