March 3, 2017

Hadoop in scaling up biological data analysis projects

In the Bioinformatics world, we are all familiar with such phrases: “as sequence entries in the major genomic databases currently rise exponentially”, “the explosion in volume of genomic data”, “as biological databases grow larger”, “dealing with all this ever-increasing information”… We come across such phraseology in journal articles, presentations etc. for a reason. The community has access to some great free, open source software in addition to commercially licensed applications, which are all developed after extensive research and efforts put into biological data analyses. But it is not uncommon for these programs or solutions to become obsolete when it comes to big data analysis, and the truth is that no matter how powerful your machines are, it is not always straightforward to “scale up” to handle big data.

Now let’s go over a simple example of finding lines in a text file matching a certain text pattern. For example, to get all the lines matching the word “dog” in our diary file, diary.txt, those who have access to a Linux/MacOS terminal would just type:

grep dog diary.txt

This will list all the lines containing “dog”, including any part-of-word occurrences, as in “dogmatic”.  Of course, we can improve this command by adding the –i switch to perform case insensitive matching and we can be more stringent to list lines matching only whole words, omitting any lines containing words such as “Dogruel” with the –w switch.  That’s a simple, effective solution to search our little text file. But the question we should all get used to asking is valid here once again: is it scalable?

Now suppose our text file is a few terabytes, or even worse, we have a text file containing all the tweets posted since the launch of Twitter. Performing any operation in a reasonably fast and efficient way on such big data requires multiprocessing and data sharing across multiple processors.

Distributed file systems (DFS) allow us to share a file system transparently across many machines. Each file maybe stored across multiple machines in smaller pieces, and files can be replicated across multiple machines. Yet, this is all transparent to the user, meaning that if performing an “ls” command on the DSF, you won’t see the individual data pieces, but the standard file structure you would see on a non-distributed system.

Apache Hadoop is a MapReduce framework for running applications on large clusters that can be built of commodity hardware. Hadoop was derived from Google's MapReduce and Google File System (GFS) papers.  It has a wide popularity among “big data” companies. Its prominent users include Yahoo, Facebook, LinkedIn, Twitter, Apple, Last.fm, eBay and many others. Hadoop only requires Java 1.6.x or higher, preferably from Sun/Oracle. It can run on commonly available hardware of different type and brands, and it does not specifically need very reliable, expensive hardware.

Before dwelling upon MapReduce, I want to illustrate how easy and intuitive it is to use HDFS, or the Hadoop Distributed File System, with a couple of examples. For instance, to list files under the directory /projects/ one would issue the command:

hadoop fs –ls /projects/

Similarly, to make a new directory under /projects:

hadoop fs –mkdir /projects/project101

And you got the idea! Basically, you can use many of the file-system related Linux shell commands on Hadoop, provided you prefix them with “hadoop fs -“. To see all the other supported shell commands, check out this.

A minimal Hadoop cluster requires two master nodes and at least one slave node (this setup will not be an optimal Hadoop cluster, nevertheless it can be used for testing purposes). In Hadoop terminology, the master node is called a Name Node. Name nodes are responsible from the HDFS layer of Hadoop. A second master, the Job Tracker, pushes work out to available Task Tracker nodes in the cluster. Each slave node is both 1) a Data Node where HDFS data can be stored and 2) a Task Tracker that can run a sub-task assigned to it.

MapReduce is a paradigm for processing large amounts of data by breaking down a large task into smaller manageable tasks. First we “map”, then we “reduce”!

In the “map” step, the master node receives the input, processes and divides it into smaller tasks, and then distributes these sub-tasks to worker nodes. The input data must be already present on HDFS before this step:

hadoop fs –put diary.txt /projects/project101/diary.txt

Data on HDFS is divided into blocks of 128 MB (this is configurable).  This means that our diary.txt file of 300 MB will be divided into three bits: two 128 MB and one 44 MB portions. These pieces are stored across multiple data nodes. They can be replicated by a factor as determined in the configuration. This data redundancy is beneficial for two reasons: good as a counter measure for hardware failures and also good for more optimal data access. HDFS blocks are large compared to disk blocks, which minimizes seek time. Individual map tasks in MapReduce operate on one block at a time, which means, if the number of sub-tasks is fewer than the nodes in the cluster, your job might run substantially slower than it would run normally, due to additional but necessary data transfers between data nodes. Hadoop tries to co-locate the data with the compute node as much as possible.

Pretty much all introductory material about MapReduce talk about the canonical example application of counting the appearances of words in a set of documents to show the logic behind MapReduce. For a change, let’s demonstrate MapReduce using another example: Suppose that we have thousands of digital diaries on a single huge file, and each entry in a diary starts with a topic tag, and that we want to efficiently find the lengthiest record for each category tag in terms of the total number of words in entries.

Firstly, the mapper should drop all bad records by discarding any problematic entries. Usually data preparation is performed in the mapping stage. For each topic tag, we count the number of words and record it as a (tag, count) tuple. Then we pass this list of tuples to our reducer program, which should find the maximum number of words in each topic by maintaining a (tag, maxCount) map, as in this Java code snippet:

int maxValue = Integer.MIN_VALUE;

while (values.hasNext()) {

maxValue = Math.max(maxValue, values.next().get());

}

output.put(key, new Integer(maxValue));

In this reducer example, I used a map, designated with the “output” variable, on purpose: to puzzle the reader a little bit who was expecting a “map” functionality (only) in the mapper! The map phase in MapReduce generates key-value pairs but it has nothing to do with a hashtable / map concept adhering the Java Map interface that does not allow duplicate keys. It is merely a stream of key and value pairs.

MapReduce design is flexible. A script or a binary that is not doing anything apart from echoing the received input can serve as a perfectly valid mapper or reducer. There is no restriction on how much of the core computation is performed by the mapper or reducer tasks. Furthermore, the proportion of mapper sub-tasks to reducer sub-tasks does not have to be one: the number of mapper sub-tasks and the number of reducer sub-tasks are eventually decided by Hadoop, although one can set some limits in the configuration that can guide Hadoop while determining this. For any finished mapper task, Hadoop can launch a reducer task, while other mappers are still running. The progress of all submitted jobs and corresponding MapReduce tasks can be monitored through a web interface on the job tracker.

With the use of the streaming module of Hadoop, one can use any preferred programming language to write MapReduce programs as long as both scripts can accept input from stdin and print their output to stdout. In fact, your MapReduce scripts must work on a single machine with reasonable input sizes, outside of the Hadoop cluster as well. The MapReduce layer on Hadoop can be conveniently summarized with this Linux one-liner, in which the MapReduce scripts are assumed to be in Python:

cat diary.txt |python myPythonMaper.py | sort | python myPythonReducer.py

The key-value pairs generated by the mapper are sorted and then passed on to the reducer. This exactly corresponds to this Hadoop streaming-type job submission command:

hadoop jar $HADOOP_HOME/hadoop-streaming.jar -input /projects/project102/input.txt -output myOutputDir –mapper myPythonMapper.py -reducer myPythonReducer.py -file myPythonMapper.py -file myPythonReducer.py

The option "-file” must be used to ship all necessary scripts, input files and all other necessary auxiliary files into the file system, unless these are already on all the Hadoop nodes.  Hadoop performs sorting of tuples, or the mapper output, automatically. The use of the streaming module is optional and just a convenience to run non-Java programs; otherwise Java jar files having map and reduce class implementations can be run on Hadoop directly:

hadoop jar myJar.jar com.eaglegenomics.test [any args]

We always need faster, better algorithms, databases and workflows to better cope with the rapid increase in the biological data. But robust scaling up is not something achievable by only making these better. For problems that can be characterized in a MapReduce fashion, Hadoop offers a time- and cost-effective divide-and-conquer platform that can be utilized in a multitude of data analysis projects.

Topics: Big data, Big data technology, Bioinformatics, Distributed File Systems, Hadoop, HDFS, MapReduce