CPU Usage in Massively Distributed Analytic Data Warehouses

September 4, 2014 Andreas Scherbaum

featured-GP-CPUUsageRecently, a customer asked how Pivotal Greenplum Database (GPDB) ensures all available CPU resources are used for processing a query in an analytic data warehouse environment. As we covered the details, we realized that it is a complicated topic and not easy to measure. To help others understand, I decided to run some tests and gather system statistics that will help explain massively parallel databases and distributed computing.

Below, we will explain the measurements, the details of how GPDB works, and break down each query into results across the distributed environment.

First Problem: How to Measure CPU Usage?

A typical GPDB appliance or full rack, is 16 segment servers and 2 master servers (i.e., one master and one standby master). We should be able to see CPU usage on the master and on the segment server. On a typical Linux system, the file “/proc/stat” provides system statistics, and, among many other lines, there is also one line with “cpu” (total stats over all CPUs) and one line per CPU core. The fields in this file are:

  • user: normal processes executing in user mode (measured in time units in USER_HZ, or called: jiffies)
  • nice: niced processes executing in user mode
  • system: processes executing in kernel mode
  • idle: nothing to do
  • iowait: waiting for I/O operations to complete
  • irq: serving interrupts
  • softirq: serving soft interrupts

To gather this information, I wrote a small script which periodically dumps this data into a file. To measure whole seconds, the script waits until the next uptime second starts, and then gathers the CPU stats listed above. After that, it will gather the stats again after each full second. Importantly, the script is not just sleeping for more or less 1 second but is constantly on the watch when a new second starts. For every second, the differences compared to the previous record are written into a logfile.

Since every server has 32 CPU cores, each logfile line will have 32 x 8 fields (32 cores x 7 fields from /proc/stat plus cpu number) plus 1 x 8 fields for the overall sum, plus 3 additional columns: hostname, uptime second and current timestamp. Together, this is 267 columns of information, and the stats are gathered on the master server and one segment server.

Second Problem: How to Make Use of All the Data?

The answer to that is actually easy. Gnuplot, an open source plotting tool, can create sufficient and effective graphics once it is fed with useful input data. Of course, other tools can also read the raw data.

Third Problem: What to Measure?

After running some tests, we chose a test data set from a customer pilot or proof of concept. For details below, I changed table and column names, but the queries and data come straight from what the customer provided. To begin creating insightful reports, I ran two different queries in three variants as explained below.

The first one is a simple query:

SELECT COUNT(*) FROM diagnose;

QUERY PLAN

————————————————————————————————–

 Aggregate  (cost=9411955.32..9411955.33 rows=1 width=8)
   ->  Gather Motion 128:1  (slice1; segments: 128)  (cost=9411954.00..9411955.30 rows=1 width=8)
     	->  Aggregate  (cost=9411954.00..9411954.01 rows=1 width=8)
           	->  Seq Scan on diagnose  (cost=0.00..7875750.00 rows=4800638 width=0)
(4 rows)

The second query is more complicated:

SELECT a12.quarter AS quarter,
a11.kv_id AS kv_id,
SUM(a11.efficiency_frequency) AS col1,
SUM(a11.service_requirements_currency1) AS cur1,
SUM(a11.service_requirements_currency2) AS cur2,
SUM(a11.service_requirements_points) AS points
FROM service_requirements a11
JOIN data_basis a12
ON (a11.quarter = a12.prev_quarter)
WHERE a12.quarter IN (20102, 20103, 20104)
GROUP BY a12.quarter, a11.kv_id;

QUERY PLAN

————————————————————————————————————————–

Gather Motion 128:1  (slice3; segments: 128)  (cost=17030708.71..17030709.73 rows=1 width=108)
   ->  HashAggregate  (cost=17030708.71..17030709.73 rows=1 width=108)
     	Group By: a12.quarter, a11.kv_id
     	->  Redistribute Motion 128:128  (slice2; segments: 128)  (cost=17030706.42..17030707.44 rows=1 width=108)
           	Hash Key: a12.quarter, a11.kv_id
           	->  HashAggregate  (cost=17030706.42..17030706.42 rows=1 width=108)
                 	Group By: a12.quarter, a11.kv_id
                 	->  Hash Join  (cost=96.96..16756955.49 rows=142579 width=28)
                       	Hash Cond: a11.quarter = a12.prev_quarter
                       	->  Seq Scan on service_requirements a11  (cost=0.00..9198189.20 rows=4800465 width=28)
                       	->  Hash  (cost=92.21..92.21 rows=3 width=4)
                             	->  Broadcast Motion 128:128  (slice1; segments: 128)  (cost=0.00..92.21 rows=3 width=4)
                                   	->  Seq Scan on data_basis a12  (cost=0.00..88.38 rows=1 width=4)
                                         	Filter: quarter = ANY ('{20102,20103,20104}'::integer[])

4 aggregates, 2 tables, a join and where conditions—should result in some work for the CPUs.

The third variant is actually the second query, run three times in parallel.

The Gory Details of the Greenplum Queries and Pipelining

GPDB splits up queries into independent parts called slices. Everything which can be executed independent from other parts of the query is moved into one slice. Results can flow from one slice into another, without writing intermediate results to disk. This is called “pipelining” and is a major advantage of Greenplum in terms of speed and distributed computing.

CPU_Pipeline

When a query is executed on a distributed segment server, every slice is started in a separate process. This assures every process is executed on another, independent CPU core. This is exactly what we want to see.

Now, the first query above has only one slice, called “slice1”. There isn’t much to spread across multiple CPU cores. In the end, it’s only reading from disk and counting rows. The second query, on the other hand, uses three slices. The “data_basis” scan is “slice1”, the scan on “service_requirements” together with the data broadcast is “slice2”, and the final gather motion is “slice3”. This approach makes 3 slices and 3 different processes per segment database server. With 8 segment database servers, 3 slices should result in 24 different processes.

The Results of CPU Usage in Greenplum

Let’s look at the three query results.

First Query Results

small_query_master

Now, let’s look at the segment server:

The CPU usage shows that 8 cores, out of 32, are busy.

small_query_sdw1

It’s also clear that most CPU activity is in system calls and I/O waits:

small_query_sdw1_iowait

small_query_sdw1_system

The CPU activity in user space is almost non existent:

small_query_sdw1_user

Clearly this query is I/O bound and waiting for data coming in from disk. The CPU cores cannot do much here. In fact, 24 out of 32 cores are idle. This is not very impressive.

Second Query Results

Now, we can look at the bigger query.

This query runs 7 minutes 40 seconds. On the master, we can see some CPU usage at the beginning of the query as we have seen before. However, there is also a big spike at the end, and this event represents the gather motion:

big_query_master

On the segment server, the picture looks almost the same, except more CPU cores are busy:

big_query_sdw1

In detail we can see that most of the system is dealing with I/O waits:

big_query_sdw1_iowait

big_query_sdw1_system

The CPU usage for user space programs is almost non existent:

big_query_sdw1_user

It becomes clear that even this query does not use all CPU resources, and, because the CPU cores are not fully utilized, the operating system is running fewer CPU cores. There are 16 cores that show activity, as opposed to the theoretically possible value of 24. This makes sense: if a process is forked from the master database process, the memory which is copied is still in the Lx cache of the core where the master is running. By keeping child processes on the same core, the system avoids costly memory transfers. And, after all, half the cores are still idle.

Third Query Results

Running the three queries in parallel adds more perspective to the behavior.

What happens if the above query is started three times in parallel? After all, we still have half the CPU cores doing nothing.

The query runtime does increase, but it does not triple. Each of the three queries completes between 613 and 615 seconds (10 minutes, 13 – 15 seconds). These are very similar execution times for all three queries.

The picture on the master remains the same with some spikes at the beginning and the end:

three_queries_master

However the picture on the segment server changed. Now, all CPU cores are used constantly:

three_queries_sdw1

Both the I/O wait and the system usage show that all cores are now in use:

three_queries_sdw1_iowaitthree_queries_sdw1_system

The CPU usage at the userspace level remains very low:

three_queries_sdw1_user

The Overall Conclusion:

For these types of queries, the database has to spawn 72 processes (3 queries in parallel, 3 slices each, on 8 segment databases) across the 32 cores. The graphs show that a higher number of processes still remain on half the cores, but whenever more CPU resources are needed, processes are moved to other cores. This also explains why the 3 queries do not need 23 minutes (3 x 7min 40sec) but merely 10 minutes 15 seconds in total.

The more sophisticated your queries become, the better Pivotal Greenplum Database is able to split the work up into more slices and execute slices on different CPU cores. This drastically improves query performance.

 

 

About the Author

Andreas Scherbaum

Andreas Scherbaum is working with PostgreSQL since 1997. He is involved in several PostgreSQL related community projects, member of the Board of Directors of the European PostgreSQL User Group and also wrote a PostgreSQL book (in German). Since 2011 he is working for EMC/Greenplum/Pivotal and tackles very big databases.

Follow on Twitter Follow on Linkedin Visit Website
Previous
Product Office Hours: Knotable onboarding flow
Product Office Hours: Knotable onboarding flow

Below is a post from Angus McLeod of Knotable about his experience with Pivotal Product Office Hours. It wa...

Next
Case Study: How shrebo, the Sharing Economy Platform, Innovates with Cloud Foundry
Case Study: How shrebo, the Sharing Economy Platform, Innovates with Cloud Foundry

Former banking technology executive and shrebo founder, Patrick Senti, gives Pivotal.io blog readers an exp...