The ClustrixDB Query Evaluation Model

Query Evaluation Model

Recently we’ve started to dig into the internals of ClustrixDB, specifically How ClustrixDB Accomplishes Horizontal Scaling of Both Writes & Reads Without Sharding. Next, we dug into the details of the multi-patented ClustrixDB Rebalancer. This time we will discuss the ClustrixDB Query Evaluation Model.

ClustrixDB is a MySQL-compatible distributed RDBMS that provides linear scale out of both writes and reads, while maintaining relational semantics, including ACID transactionality and referential integrity. Typically MySQL workloads are only able to scale out both writes and reads if sharding is used. Sharding is the strategy of partitioning your MySQL application workload across multiple separate MySQL database servers, allowing queries and data CRUD operations to fan-out. This means multiple separate MySQL physical servers must be deployed, the workload data needs to be partitioned across them, and the application needs to be rewritten to manage any ACID transactionality needed between those servers. ClustrixDB is able to provide a similar linear scale out of sharding, but the data distribution is automatically handled via the multi-patented ClustrixDB Rebalancer behind the scenes. The application doesn’t require rewrites, and sees only a single logical RDBMS while all cross-node ACID transactionality is handled automatically.

In short, ClustrixDB can linearly scale out via a combination of the following:

  • Horizontal Scaling Secret #1: Automatic Data Distribution
  • Horizontal Scaling Secret #2: Automatic Fan-out of SQL Queries
  • Horizontal Scaling Secret #3: Automatic Data Rebalancing

The ClustrixDB Rebalancer is one of the core components providing #1 and #3 above.

Horizontal Scaling Secret #2, Automatic Fan-out of SQL Queries, is handled by the ClustrixDB Query Optimizer, and the Query Evaluation Model. We drilled into the details of the Query Optimizer last time; this time we’ll discuss the Query Evaluation Model.

ClustrixDB Query Evaluation Model

A key ClustrixDB functionality is its ability to “send the query to the data.” This is one of the fundamental principles of how ClustrixDB can scale near linearly as more nodes are added. Other RDBMS systems routinely move large amounts of data to the node that is processing the query, then eliminate all the data that don’t fit the query (often lots of data), whereas ClustrixDB appreciably reduces network traffic performance issues by only moving qualified data to the requesting node. Processors on multiple nodes can additionally be brought to bear on the data selection process. The system produces results more quickly by selecting data on multiple nodes in parallel rather than selecting all data from a single node that must first gather the required data from the other nodes in the system.

ClustrixDB uses parallel query evaluation for simple queries and Massively Parallel Processing (MPP) for analytic queries (akin to columnar stores). The Fair Scheduler additionally ensures that OLTP queries are prioritized ahead of OLAP queries. Data is read from the ranking replica assigned by the Rebalancer.

Note: For high availability (HA), ClustrixDB always maintains a minimum of two complete copies of the data. These copies are called “replicas,” and the number of replicas is configurable via MAX_REPLICAS. The Rebalancer also decides what replica is the “ranking” replica, i.e., the replica to which all read queries are sent. You might think that it would increase parallelism to allow both replicas to be read from, but this would mean there would be two separate caches for the same replica, i.e., one on each node containing the primary (ranking) and secondary replica. This would create significant cache utilization inefficiency. Or worse, if the replica(s) aren’t heavily used, then this would create double the chances for “cache miss” and a roundtrip to disk. The way ClustrixDB avoids all that is to only utilize a single replica (i.e., “ranking replica”) for reads, ensuring only a single node’s buffer cache is caching that data and avoiding unnecessary disk loads. It turns out, doing a single network hop to the node on which the data is cached, is significantly faster than staying on the local node and loading that data from disk, even if that disk is an SSD.

The ranking replica will either reside on the same node as the query initiator (i.e., GTM, or Global Transaction Manager, for that transaction’s session) or require at most a single hop. The number of hops that one query requires (zero or one) doesn’t change as data set size and the number of nodes increases enabling linear scalability of both writes and queries.

Query Compilation to Fragments

Queries are broken down during compilation into fragments that are analogous to a collection of functions. For example, a simple query can be compiled into two functions: the first function looks up where the value resides (i.e., from the local metadata map), and the second function reads the correct value from the container on that node and slice and returns to the user (the details of concurrency, etc., have been left out for clarity and will be discussed in a later blog).

As seen below, Fragment 1 runs locally to lookup on which node the slice of DONATION containing id=15 resides. Fragment 2 is then sent to that node, and run there. The result is collected on that node, and then sent back to the initiating node (i.e., GTM). Any final operations (e.g., GROUP BY or ORDER BY) would be run on the GTM node, and then the result is returned to the calling application.


Scaling Joins

Much more interesting than point-SELECTs are JOINs. JOINs describe the ability of an RDBMS to connect two different tables together via parent/child relationships, instantiated as PK (primary key)/FK (foreign key) relationships. The automatic maintenance of parent/child relationships is called “referential integrity,” and is natively provided by RDBMSs, but not by NoSQL DBMSs. This is why NoSQL databases have (very) poor support for JOINs, and why structured business data is typically stored in RDBMSs.

JOINs require more data movement by their nature. ClustrixDB achieves minimal data movement even in complex JOINs because:

  • Each representation (table or index) has its own distribution map, allowing direct lookups for which node/slice to go to next, removing broadcasts.
  • There is no central node orchestrating data motion. Data moves directly to the next node it needs to go to. This reduces hops to a minimum, given the data distribution.

Let’s look at a query that gets the name and amount for all donations collected by the particular bundler, known by id = 15:

ClustrixDB’s Query Evaluation Model would be as follows: the ClustrixDB Query Optimizer will look at the relevant statistics including quantiles, hot lists, etc., to determine a plan. The concurrency control for the query is managed by the starting node (that has the session), called the GTM node for that query. It coordinates with Local Transaction Manager running on each node. (For details see Concurrency Control, which will be discussed in a later blog). The chosen plan has the following logical steps:

  1. Start on GTM node
  2. Lookup nodes/slices where  _ID_PRIMARY_BUNDLER has (B.ID = 15) and <forward> to that node
  3. Read, name from the representation _ID_PRIMARY_BUNDLER
  4. Lookup nodes/slice where _BUNDLER_KEY_DONATION has (D.BUNDLER_ID = B.ID = 15) and <forward> to that node
  5. Join the rows and <forward> to GTM node
  6. Return to user from GTM node

The key here is in Step #4. For joins, when the first row is read, there is a specific value for the join column. Using this specific value, the next node (which might be itself) can be looked up precisely.

Let’s see this graphically. Each returned row has gone through the maximum of three hops inside the system. Note, this is significantly less network traffic than a query evaluation model, which requires each row to be broadcast for any correlated subqueries. Even as the number of returned rows increases, the work per row does not. Also, rows being processed across nodes and rows on the same node running different fragments use concurrent parallelism by using multiple cores. The rows on the same node and same fragment use pipeline parallelism between them.

For example, assume we have three nodes. The three nodes are drawn multiple times in a single query, for every stage of query evaluation.


Now that we have seen the path of a single row, let’s assume we didn’t have the B.ID=15 condition. The Join will now run in parallel on multiple machines since is present on multiple nodes. Let’s look at another view, this time the nodes are drawn once and the steps flow downward in chronological order. Please note that while the forwarding for Join shows that rows may go to any node, this is in context of the entire sets of rows. One single row usually only goes to one node.


Joins with Massively Parallel Processing

As we see above, for a single row, ClustrixDB is able to precisely forward it by using a unicast.

Now, let’s zoom out and see what a large JOIN means at a system level, and how many rows are forwarded in a cluster. Each node is getting only the rows that it needs. The colors on the receiving nodes match those on the arrows. This is to indicate that rows are flowing correctly to nodes where they will find matching data for a successful join.


The ClustrixDB Query Evaluation Model: Moving the Query to the Data

The ClustrixDB Evaluation Model automatically breaks each SQL query down into its component parts, and the compiled parts, i.e., functions, are sent directly to the node(s) on which the requisite data resides. This enables ClustrixDB to execute each query with maximum parallelism; data isn’t moved to a central node to be processed, only the results sets of queries run on each node are moved across the wire for final aggregation. Similarly, less complex queries not needing significant amounts of fan-out are run simultaneously and with maximum concurrency.

This query evaluation via parallelization of queries as well as minimization of data movement ensures ClustrixDB linearly scales out both writes and reads with each added node.