# Hadoop MapReduce Can Transform How You Build Top-Ten Lists

September 25, 2012

It seems like websites, magazines, and TV shows all over the place are building top ten lists (or top-k lists) these days. The top ten science fiction movies of all time, the best places to live, etc. Top-ten lists are not only a lot of fun because of our seemingly primal need to create categories and hierarchies — they can actually be a useful way to analyze your data.

A lot of times, the most interesting records in your data set are the ones with the most extreme values. It’s mind-expanding to think about building a top ten from billions and trillions of records, but it’s also a remarkable achievement for those in the list. Here’s a design pattern you can use to develop a MapReduce job that produces a top-ten list from your data.

At my job at Greenplum, I get to work with a new data every week while implementing Hadoop solutions for customers. One of my favorite things to do when I’m exploring new data is to find extremes by creating a top-ten list from all that information.

For example, we generated a list of the top ten phones by numbers that have placed the most number of calls in a one day period. The top phone number was placing millions of calls in a day, which was obviously not normal for a cell phone. The others in the top ten list were pretty abnormal as well, placing tens of thousands of calls a day. This list showed some abnormalities that the company should take a closer look at to see if they are representative of some sort of malicious or dangerous activity.

## The Top Ten Pattern

Before explaining how to solve this problem, first I want to tell you how not to. For this article, I’ll be using a running example of finding the top ten heaviest cats in our imaginary data set. The way I would do this in a SQL database is to order the data first by weight, then take a the top ten items off the top:

```
SELECT catname, weight FROM cats ORDER BY weight DESC LIMIT 10;

```

This SQL statement is going to find us the top ten heaviest cats in our data set.

Unfortunately, sorting the data first and then taking a limit is not something we want to be doing in MapReduce. Performing a total ordering of all the data (like in the TeraSort benchmark) is a significant computational endeavor. Without any tricks, total ordering requires a few MapReduce jobs and a significant amount of movement of data over the network. For example, the following Pig script (using 0.10.0) runs four distinct MapReduce jobs to produce the output:

```
cats = LOAD 'test.csv' AS (catname:chararray, weight:double);
ordcats = ORDER cats BY weight DESC PARALLEL 200;
heavycats = LIMIT ordcats 10;
DUMP heavycats;

```

So, sorting the data is not an option, but there is a clever way to do this.

To find a top ten list with only one MapReduce job, we’re going to set up a tournament in our Hadoop cluster. The tournament is pretty simple:

1. Each mapper finds its local top ten list and sends that list to the reducer.

2. The reducer finds the top ten from the finalists sent from the mappers.

## The Mappers

Our MapReduce job is going to use the default number of mappers: one per input split. Nothing special there.

In each map task, we’re going find the top ten records for that particular input split. The reason why we need to find the top ten and not, for example, the top five (or top two), is to accommodate the unlikely case where the top records could all be within this one input split. To do this, we’re going to maintain a top ten data structure across the entire map task, modify it as we see records, then finally output the top ten at the end of the task.

I like to keep an ordered list using a TreeMap data structure, because it keeps the items in sorted order and it’s easy to trim off of the end of it. To keep a top ten list in a TreeMap, keep inserting records into it with the key being the record (cat name and weight) and the value being the value to be sorted on (weight). Then, after inserting a new record, trim the TreeMap back down to size ten by removing the smallest item. If the new item just inserted was larger than some of the other ten, the smallest one that was there before will get bumped off. If the new item just inserted was smaller than the other ten, then it will be the one bumped off.

In the code, declare the TreeMap as a private data member of the mapper class you are making. This will set up the list when the mapper is started.

```
public static class TTMapper extends Mapper<Object, Text, NullWritable, Text> {
private TreeMap<Double, Text> fatcats = new TreeMap<Double, Text>();
…

```

Next, in the map function, add the new record to the TreeMap. Then check if there are ten items in it, and if there are, trim the smallest one off. This makes sure the list is no longer than ten items, but also allows fewer than ten. Doing it this way accounts for the situation that an input split only has a couple records in it.

```
…
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

// Split on tab to get the cat name and the weight
String v[] = value.toString().split("t");
Double weight = Double.parseDouble(v[1]);

fatcats.put(weight, value);

if (fatcats.size() > 10) {
fatcats.remove(fatcats.firstKey());
}
}

…

```

Note that in the map function we aren’t doing context.write(…) anywhere, which is definitely atypical. We have to wait until the task is complete to output the results. Thankfully, the MapReduce framework provides us with a cleanup function that conveniently runs after the last map function runs. This is where the last of our code for the mapper will go.

```
…
protected void cleanup(Context context)
throws IOException, InterruptedException {

for ( Text catname : fatcats.values() ) {
context.write(NullWritable.get(), catname);
}
}
}
…

```

Our TreeMap at this point should have at most ten records in it and now those records are pushed through into the next phase of the job. Note that we are using NullWritable here. The reason for this is we want all of the outputs from all of the mappers to be grouped into a single key in the reducer.

## The Reducer

We specifically want one reducer because there will only be one key in all of the data at this point (a NullWritable). All of the data from the mappers will be collected together. Remember to do job.SetNumReduceTasks(1); in your driver code to be sure there is only one reducer

The reducer works very similarly to the mapper in that it will have a TreeMap to store the top ten, then finally output the top ten all at once. However, since all of the records are contained in the Iterable for the values in the reduce function, we don’t need to use cleanup or several calls to reduce.

```
...
public static class TTReducer extends Reducer<NullWritable, Text, NullWritable, Text> {

public void reduce(NullWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

TreeMap<Double, Text> fatcats = new TreeMap< Double, Text>();

for (Text value : values) {
String v[] = value.toString().split("t");
Double weight = Double.parseDouble(v[1]);
fatcats.put(weight, value);

if (fatcats.size() > 10) {
￼￼￼￼￼￼
￼￼￼￼     fatcats.remove(fatcats.firstKey());
}
}

for (Text t : fatcats.values()) {
context.write(NullWritable.get(), t);
}
}
}

```

That’s about it. You should end up with a single tiny output file with the ten fattest cats around.

Running a job like this to compute your top ten list will be far more efficient than the alternative of sorting the data first, then taking the top ten. The amount of data being moved over the network to the reducer in the top ten job here is minuscule in comparison to all of the data in a sort.

More tips and a much more detailed discussion of this pattern can be found in Chapter 3 of my book, MapReduce Design Patterns, by Donald Miner and Adam Shook, available in October from O’Reilly Publishers.

###### Previous
JB Steadman – How to Make Your Objects Selfish, Lazy, and Ignorant in One Easy Step