5 minutes with – Spring Integration

A very common schema, nowadays, is the need of transforming data and processing them through a pipeline where, at the end, we get the data ready to use.

In this article, I’ll show how to get it using Spring Integration by a simple example.

Spring Integration comes with the same concepts of the Enterprise Application Integration (EaI) and helps us to build a pipeline of tasks where the data are processed.

The idea is to use something to get the data from one source, processing them by enrichment, filtering and grouping process, and the make them available for other processors.

So, the common schema is one inbound endpoint, one or more process steps and one outbound endpoint.

The data are encapsulated into an Object named “message” that’s splitted in two parts. The header and the payload. The first contains some properties of the message (most of them depend on the type of the endpoint) while the payload contains the data of the message.

The classical example is for a message coming from a File. The header contains properties as the filename and the size, and the payload contains the content of the file, binary o textual.

The data are transferred through the endpoints and the process steps by the channels and, graphically, it’s something that looks like this:

integration1

I produce that using Spring plugin for Eclipse, it’s showed switching the integration-graph view.

The process starts from an fileInbound (at the left margin) and finishes at two FileOutbound (at the right bottom margin). In the middle we’ve three channels (the picture with a “tube”) and other components that processing the data.

I wanted to reproduce the same process described in “Introduction to Spring Batch” but using Spring Integration. The process’s aim is to process a file from FligtRadar and producing two files contain the information about the number of  flights from and to the airports.

The steps are:

  • Read source file;
  • Split the lines in multiple messages;
  • Aggregate the messages based on the departure and the destination airports;
  • Write two files, one for every aggregation.

So, let’s start the code. The process is an Xml file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:file="http://www.springframework.org/schema/integration/file" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:feed="http://www.springframework.org/schema/integration/feed" xsi:schemaLocation="http://www.springframework.org/schema/integration/feed http://www.springframework.org/schema/integration/feed/spring-integration-feed.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

<file:inbound-channel-adapter id="fileInboundAdapter" directory="/home/working/SpringBatchFlightRadar/input/" >
<int:poller fixed-rate="5000"></int:poller>
</file:inbound-channel-adapter>

<file:file-to-string-transformer input-channel="fileInboundAdapter" output-channel="toStrings"/>

<int:channel id="toStrings"/>

<int:splitter ref="SplitFile" input-channel="toStrings" output-channel="aggregateOutboundChannel" />

<bean id="SplitFile" class="it.blog.springintegration.SplitFile" />

<int:channel id="aggregateOutboundChannel" />

<int:recipient-list-router id="customRouter" input-channel="aggregateOutboundChannel">
<int:recipient channel="aggregateOutboundChannelFrom"/>
<int:recipient channel="aggregateOutboundChannelTo"/>
</int:recipient-list-router>

<int:channel id="aggregateOutboundChannelFrom" />
<int:channel id="aggregateOutboundChannelTo" />

<int:chain input-channel="aggregateOutboundChannelFrom" output-channel="FileOutputFrom">
<int:aggregator ref="flightAggregator" method="aggregateFrom" />
<int:transformer ref="flightTransformer" ></int:transformer>
</int:chain>

<int:chain input-channel="aggregateOutboundChannelTo" output-channel="FileOutputTo">
<int:aggregator ref="flightAggregator" method="aggregateTo" />
<int:transformer ref="flightTransformer" ></int:transformer>
</int:chain>

<bean id="flightAggregator" class="it.blog.springintegration.FlightAggregator" />
<bean id="flightTransformer" class="it.blog.springintegration.FlightTransformer" />

<file:outbound-channel-adapter id="FileOutputFrom" directory="/home/working/SpringBatchFlightRadar/output/" filename-generator="nowFileNameGeneratorFrom" />

<file:outbound-channel-adapter id="FileOutputTo" directory="/home/working/SpringBatchFlightRadar/output/" filename-generator="nowFileNameGeneratorTo" />

<bean id="nowFileNameGeneratorFrom" class="it.blog.springintegration.NowFileNameGenerator">
<property name="suffix" value="From" />
</bean>

<bean id="nowFileNameGeneratorTo" class="it.blog.springintegration.NowFileNameGenerator">
<property name="suffix" value="To" />
</bean>

</beans>

Lines 13-15: Read the source File.
Line 17: Convert the payload from Object to String.
Lines 19-23: Split the payload in multiple messages; every message has a payload with a single file line; the process is splitted in two parts.

The bean “SplitDefined” is defined as:


public class SplitFile extends AbstractMessageSplitter {

@Override
protected ArrayList<?> splitMessage(Message<?> message) {

String file = (String)message.getPayload();

String[] fileSplitted = file.split("\\r?\\n");

ArrayList<?> messages = new ArrayList<>( Arrays.asList( fileSplitted ) );

return messages;
}
}

Lines 35-46: The lines are aggregated into an HashMap and then formatted for a csv file.

The FlightAggregator class is:


public class FlightAggregator {

private static final String FLIGHT_PATTERN = ",\"\\w+\":\\[\"\\w+\",\\d+\\.\\d+,\\d+.\\d+,\\d+,\\d+,\\d+,\"\\d+\",\"\\w+-\\w+\",\"\\w*\",\"\\D+\",\\d+,\"([A-Z]{3})\",\"([A-Z]{3})\"";

private static final Pattern PATTERN = Pattern.compile(FLIGHT_PATTERN);

public Map<String, Integer> aggregateFrom(List<Message<?>> reqlist) {

System.out.println("Total messages to send: " + reqlist.size());

Map<String, Integer> request = new HashMap<String, Integer>();

for (Message<?> mess: reqlist) {

String payload = (String)mess.getPayload();

/*
* Parse the string
*/
Matcher m = PATTERN.matcher(payload);
if (m.find()) {

String key = m.group(1);

if (request.containsKey(key))
{
Integer count = request.get(key);
count++;
request.put(key, count);
}
else
request.put(m.group(1), 1);
}
}

return request;
}

public Map<String, Integer> aggregateTo(List<Message<?>> reqlist) {

System.out.println("Total messages to send: " + reqlist.size());

Map<String, Integer> request = new HashMap<String, Integer>();

for (Message<?> mess: reqlist) {

String payload = (String)mess.getPayload();

/*
* Parse the string
*/
Matcher m = PATTERN.matcher(payload);
if (m.find()) {

String key = m.group(2);

if (request.containsKey(key))
{
Integer count = request.get(key);
count++;
request.put(key, count);
}
else
request.put(m.group(2), 1);
}
}

return request;
}
}

The class FlightTransformer is

public class FlightTransformer {

@Transformer
public String MapToString(Map<String, Integer> payload)
{
StringBuilder output = new StringBuilder();

for (String key : payload.keySet()) {
output.append(key);
output.append(",");
output.append(payload.get(key));
output.append("\r\n");
}

return output.toString();
}
}

Lines 48-58: The tables transformed are written in the destination files. The bean defined the filename generated.


public class NowFileNameGenerator extends DefaultFileNameGenerator {

private String suffix;

public String generateFileName(Message<?> message) {
return new SimpleDateFormat("yyyy_MM_dd_hh_mm").format(new Date()) + suffix + ".csv";
}

public String getSuffix() {
return suffix;
}

public void setSuffix(String suffix) {
this.suffix = suffix;
}
}

Let’s run the process!


@SpringBootApplication
@ImportResource("/flightradar.xml")
public class FlightRadar {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = new SpringApplication(FlightRadar.class).run(args);
System.out.println("Hit Enter to terminate");
System.in.read();
ctx.close();
}

}

From a source file like this:


,"acc16ef":["02A1CC",47.0789,6.4517,355,40000,456,"4020","F-LSGG2","B736","TS-IOM",1472197715,"TUN","AMS","TU880",0,-64,"TAR880",0]
,"acc1933":["4A08E6",45.2368,13.3443,276,40000,455,"7343","F-LIBP2","B737","YR-BGF",1472197713,"OTP","GVA","RO351",0,0,"ROT351V",0]
,"acbf08e":["748051",45.6832,9.5044,275,40000,468,"1000","F-LIMJ1","A332","OD-MEE",1472197715,"BEY","GVA","ME213",0,0,"MEA213",0]
,"acc21bc":["471F8B",47.2388,11.0219,103,39025,456,"2123","F-LIPE3","A320","HA-LYP",1472197713,"BVA","SOF","W64322",0,0,"WZZ75",0]
,"acc1f23":["3C4899",46.2110,6.5653,210,39000,443,"1124","F-LSZB1","A320","D-ABDY",1472197710,"TXL","PMI","AB3152",0,64,"BER4JG",0]
,"acb720c":["AB0A9D",46.1148,14.6011,137,39000,469,"3416","F-LOWG4","A333","N810NW",1472197713,"JFK","ATH","DL414",0,0,"DAL414",0]
,"acc24e5":["400FE3",47.2380,8.4567,124,39000,438,"7661","F-LSZH3","A319","G-EZBT",1472197715,"CDG","VCE","U23793",0,0,"EZY3793",0]
...
...

The summary files are:


2016_09_16_04_36From.csv
ZAG,2
LGW,13
IST,6
HHN,1
CAG,3
BXO,1
...
...
2016_09_16_04_36To.csv
ZAG,1
IST,6
TLN,1
PSA,1
VCE,10
...
...

The Spring Integration document reference is available here. I suggest you have a look, it’s worth to spend time reading it.

Also, have a look at the GitHub repository https://github.com/spring-projects/spring-integration-samples.

The source of the project is available at github as SpringIntegrationFlightRadar.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s