Introduction to Apache Spark

Big data issues are common nowadays than ever before. In order to survive at them and be efficient we need to use tools that help us to elaborate them.

Apache Spark can help us to work big data issue out. In this article I’ll give a first look at this framework.

Apache Spark comes as “fast and general engine for large-scale data processing” and it’s linked with Apache Hadoop or, at least, they’re close relatives.

I got the chance to write a post on Apache Hadoop in this blog and I tested the concept of map/reduce in that framework.

The above concept has been brought to Spark introducing also the feature of RDD (Resilient Distributed Dataset) as part of the core.

RDD object exposes two types of operation. The transformations, which are applied on the data and manipolate the data in a new RDD and the actions which, typically, produce a result.

So, why should we use Spark instead of Hadoop? I think that a graph could be good enough to give a valid reason for this choice:

logistic-regressionThe graph on the left illustrate the different speed between Apache Spark and Hadoop (from https://spark.apache.org/).

Spark is very faster!

 

 

Have a look at the general architecture to get the idea of which components are included in the framework.

spark_1

The core component is the base of the framework and it’s the responsible for job’s orchestration.

On the top we can see 4 components that solve specific features of the framework.

  • Spark SQL is a component that provides a tool to manage the information trough Sql language.
  • Spark Streaming makes available the real time process of the data.
  • MLlib is the machine learn component. If you’re familiar with Apache Mahout you think at that as the natural successor.
  • GraphX is applied to elaborate graphs.

Apache Spark is written in Scala and, unfortunately, I’m not very good with that language, so, my example is written in Java which is well supported by the framework.

In this sample, I’d like to show the users access to a particular service in a map using the Ip address calculating their latitude and longitude. The ip address are contained into an application log which will be processed by Apache Spark application.

Starting from the LogAnalyzer sample (reference at the bottom of the post) I’ve written the code to parse the Log and produce a Json file consumed by a javascript application.

public static void main(String[] args) throws IOException {

if (args.length < 1) {
System.err.println("Please provide the input file full path as argument");
System.exit(0);
}

init(args[0]);

lookupService = new LookupService(prop.getProperty("file.geoipcity"));

/*
* Only for Windows
*/
System.setProperty("hadoop.home.dir", prop.getProperty("hadoop.home.dir"));

/*
* Configure the application
*/
SparkConf conf = new SparkConf().setAppName("org.sparkexample.IpGeoMap").setMaster("local");

/*
* Start the context
*/
JavaSparkContext context = new JavaSparkContext(conf);

/*
* Load the log
*/
JavaRDD<String> logLines = context.textFile(prop.getProperty("file.log"));

/*
* Extract the Ip Address
*/
JavaRDD<String> ipFromLog = logLines.map(ParseLog::parseFromLogLine);

/*
* Filter the valid entry and Map them.
*/
JavaPairRDD<String, String> pairs = ipFromLog.filter(LOG_FILTER).mapToPair(LOG_MAPPER);

/*
* Remove the duplicate entries
*/
JavaPairRDD<String, String> reducer = pairs.reduceByKey(LOG_REDUCER);

/*
* Produce the geo location
*/
JavaRDD<GeoLocation> maxMind = reducer.map(MAXMIND_MAPPER);

/*
* Write the output file
*/
writeFile(produceJson(maxMind));

context.stop();
context.close();
}

Instead of the mechanism of RDD Dataset (that you can find better explained at the reference at the end of this post), I’d like to put the focus on what happens at the data during the process.

/*
* Load the log
*/
JavaRDD<String> logLines = context.textFile(prop.getProperty("file.log"))

The data has been read and ready to be manipulate. Every lines.

2016-07-06 00:00:00,111 - Method Name:/checkentry ParamsValue= ... 151.36.87.69, ...
2016-07-06 00:00:31,270 - Method Name:/setrisklevel ParamsValue= ...,5.168.46.232, ...
2016-07-06 00:01:00,111 - Method Name:/checkentry ParamsValue= ... 151.36.87.69, ...
2016-07-06 00:02:00,111 - Method Name:/checkentry ParamsValue= ... 151.36.87.69, ...
2016-07-06 00:03:31,270 - Method Name:/setrisklevel ParamsValue= ...,5.168.46.232, ...
/*
* Extract the Ip Address
*/
JavaRDD<String> ipFromLog = logLines.map(ParseLog::parseFromLogLine)

The data has been parsed extracting only the Ip address

151.36.87.69
5.168.46.232
151.36.87.69
151.36.87.69
5.168.46.232
/*
* Filter the valid entry and Map them.
*/
JavaPairRDD<String, String> pairs = ipFromLog.filter(LOG_FILTER).mapToPair(LOG_MAPPER)

The data are mapping as key/value (the same values). It’s useful for the next process of reducing.

(151.36.87.69,151.36.87.69)
(5.168.46.232,5.168.46.232)
(151.36.87.69,151.36.87.69)
(151.36.87.69,151.36.87.69)
(5.168.46.232,5.168.46.232)
/*
* Remove the duplicate entries
*/
JavaPairRDD<String, String> reducer = pairs.reduceByKey(LOG_REDUCER)

Through the process of reducing the duplicated keys are removed.

(151.36.87.69,151.36.87.69)
(5.168.46.232,5.168.46.232)
/*
* Produce the geo location 
*/
JavaRDD<GeoLocation> maxMind = reducer.map(MAXMIND_MAPPER)

From the Ip address I get the geographical location for the map. I used MaxMind GeoLite service to get this information.
More info at http://dev.maxmind.com/geoip/legacy/geolite/.

{"lat":43.147903,"lon":12.109695}
{"lat":45.6243,"lon":9.105499}

Finally, the process write the Json file interacting the DataSet.

private static String produceJson(JavaRDD<GeoLocation> maxMind) throws JsonProcessingException
{
ArrayList<String> text = new ArrayList<String>();
ObjectMapper mapper = new ObjectMapper();
Iterator<GeoLocation> geoIterator = maxMind.toLocalIterator();

while (geoIterator.hasNext()) {
GeoLocation person = geoIterator.next();
text.add(mapper.writeValueAsString(person));
}

return text.toString();
}

The final result of all this process is the following.

spark_2

That’s all for today. I think this could be a good starting point for appreciate Spark potentials. I hope to write soon something about the delivery environment and the distributed computation that, probably, it’s the real power of the platform.

The complete solution is available at

https://github.com/MarcoGhise/SparkMap

Reference

https://github.com/databricks/reference-apps/tree/master/logs_analyzer/chapter1/java8/src/main/java/com/databricks/apps/logs/chapter1

https://www.toptal.com/spark/introduction-to-apache-spark

http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s