Hadoop in practice

Speaking about cloud computing, I would like to introduce hadoop in very easy, practical, way.

After read a lot about this argument, I wanted to experiment, through example from source code and from my hands, whether the cloud computing will change our programming model in the next future.

Apache Hadoop is an open source framework born in 2007 and released under apache license. The framework is built on three parts. The main core, called Hadoop Common, a part named HDFS (Hadoop Distributed File System) and last part named MapReduce engine. Our focus is on the last two parts.

The HDFS is the hadoop file system that can hold very large dimension files. More information are available at official web site (http://hadoop.apache.org/hdfs/)

MapReduce engine is the framework with the purpose of distributing the processes among the cluster nodes of the system. Other info here (http://hadoop.apache.org/mapreduce/)

Let’s take a look at this diagram to understand what we are talking about.

The first is about Job Tracker/Task Tracker distribution.

The job Tracker sends the Job to Task Tracker on different nodes and keeps track of the work progression.

In this second diagram you can see HDFS distribution.

In this way we have only one NameNode and multiple DataNode in different nodes.

The last diagram shows the interactions between Mapping process and Reduce process 

 

All the nodes in the system talk to each other. The process is well explained in developer yahoo tutorial http://developer.yahoo.com/hadoop/tutorial/module4.html#dataflow.

Ok, We have seen a quickly preview on the system. It’s time to install the framework on local machine and run it.

First bad news. Hadoop has been developed for unix/linux system and, to use it in windows enviroment, you have to install Cygwin. A very good tutorial for this (which I followed) is available at http://v-lad.org/Tutorials/Hadoop/00%20-%20Intro.html

I used hadoop version 0.20.2 despite the last stable release version is 1.0.1. That’s because I’ve had a SetPermission exception when Hadoop tried to change the folder permission in HDFS. I think that’s only check on windows enviroment.

In the version 0.20.2 you find three files (instead of only one as tutorial reported).

core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
     <property>
         <name>fs.default.name</name>
         <value>hdfs://localhost:9000</value>
     </property>
     <property>
        <name>hadoop.tmp.dir</name>
        <value>/cygwin/usr/tmp</value>
        <final>true</final>
     </property>          
</configuration>

hdfs-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
     <property>
         <name>dfs.replication</name>
         <value>1</value>
     </property>
       <property>    
      <name>dfs.data.dir</name>    
      <value>/cygwin/usr/mydir/dfs/data</value>
    </property>
</configuration>

mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
     <property>
         <name>mapred.job.tracker</name>
         <value>localhost:9001</value>
     </property>
     <property>
        <name>mapred.child.tmp</name>
        <value>/cygwin/usr/tmp/hadoop/mapred</value>
     </property>
</configuration>

Now it’s time to running the hadoop instance. You can do it running bin/start-all.sh into the Cygwin Terminal window.

If everything goes fine, you’ll see five processes running:

  • NameNode,
  • SecondaryNameNode,
  • DataNode,
  • JobTracker,
  • TaskTracker.

The framework let available some monitors health at this address:

Having said that, I built my example started from DBCountPageView. You can find it into the folder <hadoop>/src/examples/org/apache/hadoop/examples.

In my example I simulated to create a business report about an order process. I used DerbyDb Embedded to store orders items and final report.

The tables schema are:

CREATE TABLE ORDERS (ORDERID VARCHAR(20),PRODUCT_CODE VARCHAR(20), QUANTITY INTEGER, UNIT_PRICE FLOAT, DATE TIMESTAMP);
CREATE TABLE REPORT (ORDERID VARCHAR(20), TOTAL_QUANTITY INTEGER, TOTAL_PRICE FLOAT);

The data are:


INSERT INTO ORDERS VALUES('2','9458',42,0.543,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('2','6627',77,19.70,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('2','1057',11,35.36,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('2','0601',17,94.00,'2012-09-02 10:49:12');
INSERT INTO ORDERS VALUES('2','7058',63,80.64,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('2','9079',16,20.78,'2012-09-03 10:49:12');
INSERT INTO ORDERS VALUES('2','1408',4,33.10,'2012-09-06 10:49:12');
INSERT INTO ORDERS VALUES('2','3615',82,23.44,'2012-09-06 10:49:12');
INSERT INTO ORDERS VALUES('3','2569',64,25.19,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('3','6227',48,84.66,'2012-09-04 10:49:12');
INSERT INTO ORDERS VALUES('3','1668',56,39.89,'2012-09-03 10:49:12');
INSERT INTO ORDERS VALUES('3','3437',91,81.83,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('3','5744',9,47.72,'2012-09-06 10:49:12');
INSERT INTO ORDERS VALUES('3','8058',50,13.44,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('3','4776',3,59.45,'2012-08-31 10:49:12');
INSERT INTO ORDERS VALUES('3','7052',3,16.08,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('3','8945',63,1.035,'2012-09-06 10:49:12');
INSERT INTO ORDERS VALUES('3','8206',59,51.23,'2012-09-04 10:49:12');
INSERT INTO ORDERS VALUES('3','0460',82,12.00,'2012-08-31 10:49:12');
INSERT INTO ORDERS VALUES('4','2641',54,62.75,'2012-08-28 10:49:12');
INSERT INTO ORDERS VALUES('4','7460',1,71.13,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('4','6577',12,49.85,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('4','9969',82,57.85,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('4','8891',90,13.92,'2012-09-03 10:49:12');
INSERT INTO ORDERS VALUES('4','9778',10,40.08,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('4','4024',19,12.41,'2012-08-30 10:49:12');
INSERT INTO ORDERS VALUES('4','5604',31,45.40,'2012-08-30 10:49:12');
INSERT INTO ORDERS VALUES('4','9343',91,66.84,'2012-08-30 10:49:12');
INSERT INTO ORDERS VALUES('5','0370',81,3.931,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('5','1377',66,0.408,'2012-09-04 10:49:12');
INSERT INTO ORDERS VALUES('5','1920',55,69.37,'2012-09-02 10:49:12');
INSERT INTO ORDERS VALUES('5','2387',11,53.32,'2012-08-31 10:49:12');
INSERT INTO ORDERS VALUES('5','7381',10,80.03,'2012-09-04 10:49:12');
INSERT INTO ORDERS VALUES('5','6631',26,39.93,'2012-09-04 10:49:12');
INSERT INTO ORDERS VALUES('5','5733',80,20.94,'2012-08-30 10:49:12');
INSERT INTO ORDERS VALUES('5','9912',21,32.54,'2012-09-02 10:49:12');
INSERT INTO ORDERS VALUES('5','1161',68,9.391,'2012-08-30 10:49:12');
INSERT INTO ORDERS VALUES('5','0572',32,75.82,'2012-09-03 10:49:12');
INSERT INTO ORDERS VALUES('6','6622',72,35.96,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('6','9961',61,81.27,'2012-08-29 10:49:12');
INSERT INTO ORDERS VALUES('6','5552',94,31.70,'2012-09-05 10:49:12');
INSERT INTO ORDERS VALUES('6','7417',73,57.49,'2012-09-02 10:49:12');
INSERT INTO ORDERS VALUES('6','2922',75,1.031,'2012-09-02 10:49:12');
INSERT INTO ORDERS VALUES('6','3187',39,74.44,'2012-08-30 10:49:12');
INSERT INTO ORDERS VALUES('6','3862',63,27.83,'2012-09-02 10:49:12');
INSERT INTO ORDERS VALUES('6','7257',34,54.65,'2012-08-31 10:49:12');
INSERT INTO ORDERS VALUES('6','7424',52,80.46,'2012-09-06 10:49:12');
INSERT INTO ORDERS VALUES('6','5768',89,53.19,'2012-08-30 10:49:12');
INSERT INTO ORDERS VALUES('6','5058',70,22.84,'2012-08-29 10:49:12');

I find handy first show you the main code class and then explain it.


package it.hadoopviewcounter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DBCountPageView extends Configured implements Tool {

	private static final String[] OrderFieldNames = { "orderid",
			"product_code", "quantity", "unit_price" };
	private static final String[] ReportFieldNames = { "orderid",
			"total_quantity", "total_price" };

	private static final String DB_URL = "jdbc:derby:derbyDB";

	private static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";

	static class OrderRecord implements Writable, DBWritable {

		String orderId;
		String product_code;
		int quantity;
		float unit_price;

		@Override
		public void readFields(DataInput in) throws IOException {
			this.orderId = Text.readString(in);
			this.product_code = Text.readString(in);
			this.quantity = in.readInt();
			this.unit_price = in.readFloat();
		}

		@Override
		public void write(DataOutput out) throws IOException {}

		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.orderId = resultSet.getString(1);
			this.product_code = resultSet.getString(2);
			this.quantity = resultSet.getInt(3);
			this.unit_price = resultSet.getFloat(4);
		}

		@Override
		public void write(PreparedStatement statement) throws SQLException {}

	}

	static class ReportRecord implements Writable, DBWritable {

		String orderId;
		int total_quantity;
		float total_price;

		public ReportRecord(String orderId, int total_quantity,
				float total_price) {
			this.orderId = orderId;
			this.total_quantity = total_quantity;
			this.total_price = total_price;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.orderId = Text.readString(in);
			this.total_quantity = in.readInt();
			this.total_price = in.readFloat();
		}

		@Override
		public void write(DataOutput out) throws IOException {
			Text.writeString(out, orderId);
			out.writeInt(total_quantity);
			out.writeFloat(total_price);
		}

		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.orderId = resultSet.getString(1);
			this.total_quantity = resultSet.getInt(2);
			this.total_price = resultSet.getFloat(3);
		}

		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setString(1, orderId);
			statement.setInt(2, total_quantity);
			statement.setFloat(3, total_price);
		}

		@Override
		public String toString() {
			return orderId + " " +
					total_quantity + " " +
					total_price;
		}
	}

	@SuppressWarnings("deprecation")
	static class PageviewMapper extends MapReduceBase implements
			Mapper<LongWritable, OrderRecord, Text, OrderWritable> {

		@Override
		public void map(LongWritable key, OrderRecord value,
				OutputCollector<Text, OrderWritable> output, Reporter reporter)
				throws IOException {

			Text orderId = new Text(value.orderId);
			OrderWritable order = new OrderWritable(value.orderId, value.product_code, value.quantity, value.unit_price);
			output.collect(orderId, order);
		}
	}
	@SuppressWarnings("deprecation")
	static class PageviewReducer extends MapReduceBase implements
			Reducer<Text, OrderWritable, ReportRecord, NullWritable> {

		NullWritable n = NullWritable.get();

		@Override
		public void reduce(Text key, Iterator<OrderWritable> values,
				OutputCollector<ReportRecord, NullWritable> output,
				Reporter reporter) throws IOException {

			float total_price = 0;
			int total_quantity = 0;
			while (values.hasNext()) {
				OrderWritable order = values.next();
				total_quantity = order.getQuantity();
				total_price += order.getUnit_price() * order.getQuantity();
			}
			output.collect(new ReportRecord(key.toString(), total_quantity, total_price), n);
		}
	}
	@SuppressWarnings("deprecation")
	public int run(String[] args) throws Exception {

		JobConf job = new JobConf(getConf(), DBCountPageView.class);

		job.setJobName("Orders Report");

		job.setMapperClass(PageviewMapper.class);
//		job.setCombinerClass(LongSumReducer.class);
		job.setReducerClass(PageviewReducer.class);

		DBConfiguration.configureDB(job, DRIVER_CLASS, DB_URL);

		DBInputFormat.setInput(job, OrderRecord.class, "ORDERS", null, "orderid",
				OrderFieldNames);

		DBOutputFormat.setOutput(job, "REPORT", ReportFieldNames);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(OrderWritable.class);

		job.setOutputKeyClass(ReportRecord.class);
		job.setOutputValueClass(NullWritable.class);

		JobClient.runJob(job);

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int ret = ToolRunner.run(new DBCountPageView(), args);
		System.exit(ret);
	}
}

The OrderWritable class.

package it.hadoopviewcounter;

import java.io.*;
import org.apache.hadoop.io.*;

/** A WritableComparable for floats. */
public class OrderWritable implements WritableComparable {

	private String orderId;
	private String product_code;
	private int quantity;
	private float unit_price;

	public OrderWritable() {
	}

	public OrderWritable(String orderId, String product_code, int quantity, float unit_price) {
		this.orderId = orderId;
		this.product_code = product_code;
		this.quantity = quantity;
		this.unit_price = unit_price;
	}

	public void readFields(DataInput in) throws IOException {
		this.orderId = Text.readString(in);
		this.product_code = Text.readString(in);
		this.quantity = in.readInt();
		this.unit_price = in.readFloat();
	}

	public void write(DataOutput out) throws IOException {
		Text.writeString(out,orderId);
		Text.writeString(out,product_code);
		out.writeInt(quantity);
		out.writeFloat(unit_price);
	}

	/** Returns true iff <code>o</code> is a FloatWritable with the same value. */
	public boolean equals(Object o) {
		if (!(o instanceof OrderWritable))
			return false;
		OrderWritable other = (OrderWritable) o;
		return ((this.orderId.equals(other.orderId)) &&
				(this.product_code.equals(other.product_code)) &&
				(this.quantity==other.quantity) &&
				(this.unit_price==other.unit_price));
	}

	public int hashCode() {
		return Float.floatToIntBits(unit_price);
	}

	/** Compares two FloatWritables. */
	public int compareTo(Object o) {
		float thisValue = this.unit_price;
		float thatValue = ((OrderWritable) o).unit_price;
		return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
	}

	public String toString() {
		return Float.toString(unit_price);
	}

	public void setOrderId(String orderId) {
		this.orderId = orderId;
	}

	public String getOrderId() {
		return orderId;
	}

	public void setProduct_code(String product_code) {
		this.product_code = product_code;
	}

	public String getProduct_code() {
		return product_code;
	}

	public void setQuantity(int quantity) {
		this.quantity = quantity;
	}

	public int getQuantity() {
		return quantity;
	}

	public void setUnit_price(float unit_price) {
		this.unit_price = unit_price;
	}

	public float getUnit_price() {
		return unit_price;
	}

	/** A Comparator optimized for FloatWritable. */
	public static class Comparator extends WritableComparator {
		public Comparator() {
			super(OrderWritable.class);
		}

		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			float thisValue = readFloat(b1, s1);
			float thatValue = readFloat(b2, s2);
			return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0
					: 1));
		}
	}

	static { // register this comparator
		WritableComparator.define(OrderWritable.class, new Comparator());
	}

}

Now the explaination.

The map and reduce process involves implementation of map() and reduce() methods.

Map method is called for every records inside the Orders table. I used the OrderRecord class to use records information inside my java method. The same is used for writing records inside report table with the class ReportRecord.

In reduce method I calculated the total quantity and price for all records aggregated using the key orderId.

This list below can explain what I’m speaking about.

You can see the result running the application. Before run it, I find useful configured eclipse in the way to let available the debug processing.

Open Run configuration, move to your configuration and click Classpath tab. You can see hadoop configuration launched that eclipse will use for running your application.

Edit the configuration file (mine is under <eclipse_workspace>\.metadata\.plugins\org.apache.hadoop.eclipse\hadoop-conf-4251624898335225065/hadoop-site.xml) and change this entry:


<property><name>hadoop.tmp.dir</name><value>/usr/tmp</value></property>
...
<property><name>fs.default.name</name><value>file:///</value></property>
...
<property><name>dfs.data.dir</name><value>/usr/mydir/dfs/data</value></property>
...
<property><name>mapred.job.tracker</name><value>local</value></property>
...
<property><name>mapred.child.tmp</name><value>/usr/tmp/hadoop/mapred</value></property>

Finally, you can run the application. Check before that Hadoop instance is running and then launch the class. If everything goes fine, you’ll see this output in eclipse console.

12/09/10 14:40:37 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
12/09/10 14:40:38 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/09/10 14:40:40 INFO mapred.JobClient: Running job: job_local_0001
12/09/10 14:40:40 INFO mapred.MapTask: numReduceTasks: 1
12/09/10 14:40:40 INFO mapred.MapTask: io.sort.mb = 100
12/09/10 14:40:40 INFO mapred.MapTask: data buffer = 79691776/99614720
12/09/10 14:40:40 INFO mapred.MapTask: record buffer = 262144/327680
12/09/10 14:40:40 INFO mapred.MapTask: Starting flush of map output
12/09/10 14:40:41 INFO mapred.MapTask: Finished spill 0
12/09/10 14:40:41 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/09/10 14:40:41 INFO mapred.LocalJobRunner:
12/09/10 14:40:41 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
12/09/10 14:40:41 INFO mapred.LocalJobRunner:
12/09/10 14:40:41 INFO mapred.Merger: Merging 1 sorted segments
12/09/10 14:40:41 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 1009 bytes
12/09/10 14:40:41 INFO mapred.LocalJobRunner:
12/09/10 14:40:41 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/09/10 14:40:41 INFO mapred.LocalJobRunner: reduce > reduce
12/09/10 14:40:41 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
12/09/10 14:40:41 INFO mapred.JobClient:  map 100% reduce 100%
12/09/10 14:40:41 INFO mapred.JobClient: Job complete: job_local_0001
12/09/10 14:40:41 INFO mapred.JobClient: Counters: 13
12/09/10 14:40:41 INFO mapred.JobClient:   FileSystemCounters
12/09/10 14:40:41 INFO mapred.JobClient:     FILE_BYTES_READ=48943
12/09/10 14:40:41 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=88412
12/09/10 14:40:41 INFO mapred.JobClient:   Map-Reduce Framework
12/09/10 14:40:41 INFO mapred.JobClient:     Reduce input groups=5
12/09/10 14:40:41 INFO mapred.JobClient:     Combine output records=0
12/09/10 14:40:41 INFO mapred.JobClient:     Map input records=53
12/09/10 14:40:41 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/09/10 14:40:41 INFO mapred.JobClient:     Reduce output records=5
12/09/10 14:40:41 INFO mapred.JobClient:     Spilled Records=106
12/09/10 14:40:41 INFO mapred.JobClient:     Map output bytes=901
12/09/10 14:40:41 INFO mapred.JobClient:     Map input bytes=53
12/09/10 14:40:41 INFO mapred.JobClient:     Combine input records=0
12/09/10 14:40:41 INFO mapred.JobClient:     Map output records=53
12/09/10 14:40:41 INFO mapred.JobClient:     Reduce input records=53

Take a look at Report table


ij> SELECT * FROM REPORT;
ORDERID             |TOTAL_QUAN&|TOTAL_PRICE
-------------------------------------------------------
2                   |42         |10993.9462890625
3                   |56         |15080.2158203125
4                   |54         |18180.759765625
5                   |66         |11690.646484375
6                   |94         |48570.1484375

5 rows selected

Summary

For what I’ve seen, Hadoop is a fine interesting solution for distributed calculation. The references of it are the best possible (Amazon, Google MapReduce) and, as I tried, quite easy to use.

In my example I customized class org.apache.hadoop.mapred.lib.db.DBInputFormat removed LIMIT keyword for building select query (line 113). That’s because Derby db doesn’t support LIMIT keyword.

I’m going to provide other articles about this argument in my next posts. Hope you can enjoy it.

Update

I’ve just found out this new video about Spring+Hadoop. Enjoy it

Advertisements

2 thoughts on “Hadoop in practice

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s