Introduction to Spring Batch

How accomplish a job using Spring Batch solution in order to using the same Spring technology adopted for a Web solution.

Easy to get, adding an administration monitor that helps to keep under control the process execution.

Import data or export data for business analysis are very common nowadays. Those are tasks which running in separate process, without user trigger, produce the result of data elaboration.

There’re very different framework to get these tasks completed, from an easy batch process to orchestration framework (as MuleEsb for example).

In this article, I’d like to illustrare an easy approach of data elaboration importing data from a file and producing two different files. I’ll do it by using Spring Batch, from Spring framework.

You can find everything (or almost everything) in official Spring Batch Guide.

Let’s talk about the example; I’ve got a feed file from FlighRadar24 which contains the data of the flights (latitude, longitude, flight number, airport origin, airport destination, ….). From this file I’m going to extract two reports which indicate the number of flight from an airport and the number of flight to an airport.

The steps are:

  1. Read the file and import it in a Sql Db (doesn’t matter the type of that).
  2. Get the list of the flight’s number from and to an airport.
  3. Write that in 2 files.

Graphically, using Eclipse batch-graph visualization, it look like this:

batch1

First step: Import file:


<batch:step id="ImportFile" next="GenerateOutput">
<batch:tasklet>

<batch:chunk reader="flightReader" processor="flightProcessor" writer="mysqlItemWriter" 
commit-interval="3">

</batch:chunk>
</batch:tasklet>
</batch:step>

We defined a chunk which reads, processes and writes the data from a source file; every 3 items the process data are written in the Database.

The reader


<bean id="flightReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">

<property name="resource" value="file:input/#{jobParameters['filename']}" />

<!-- Skip the first line of the file because this is the header that defines the fields -->
<property name="linesToSkip" value="1" />

<!-- Defines how we map lines to objects -->
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

<!-- The lineTokenizer divides individual lines up into units of work -->
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">

<!-- Names of the CSV columns -->
<property name="names" value="field0,field1, field2,field3, field4,field5, 
field6,field7, field8,field9, field10,field11, field12,field13, 
field14,field15, field16,field17, field18" />
</bean>
</property>

<property name="fieldSetMapper">
<bean class="it.blog.springbatch.flightradar.mapper.FlightFieldSetMapper" />
</property>
</bean>
</property>
</bean>

The processor:


<bean id="flightProcessor" class="it.blog.springbatch.flightradar.process.FlightProcessor" scope="step">
<property name="fileImport" value="#{jobParameters['filename']}" />
</bean>

And, finally, the writer


<bean id="mysqlItemWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource" />
<property name="sql">
<value>
<![CDATA[Insert into FLIGHT_REPORT(number, `from`, `to`, file_import) 
values (:flightNumber, :from, :to, :fileImport);]]>
</value>
</property>
<!-- It will take care matching between object property and sql name parameter -->
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</property>
</bean>

The data are stored into a Sql table with this structure.


CREATE TABLE `flight_report` (
`number` varchar(10) DEFAULT NULL,
`from` varchar(3) DEFAULT NULL,
`to` varchar(3) DEFAULT NULL,
`file_import` varchar(45) DEFAULT NULL
)

Briefly, the file is imported into a “Flight” bean trough “FlightFieldSetMapper” class. Every 3 items, the records are written into the Db in the mysqlItemWriter step. The processor adds the filename to the bean.

The core concept is to split the step in two or three parts as common pattern.

  • A reader (must implements “ItemReader” interface);
  • Optionally, a processor (must implements “ItemProcessor” interface);
  • A writer (must implements “ItemWriter” interface).

Once imported the data into the Db, it’s time to extract the data by group. The process runs in parallel mode thanks to “split” elements at the top of the step declaration.


<batch:split id="GenerateOutput">
<batch:flow>
<batch:step id="GenerateReportFrom">
<batch:tasklet>

<batch:chunk reader="mySqlItemReaderFrom" writer="flatFileItemWriterFrom" commit-interval="3">

</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="GenerateReportTo">
<batch:tasklet>

<batch:chunk reader="mySqlItemReaderTo" writer="flatFileItemWriterTo" commit-interval="3">

</batch:chunk>
</batch:tasklet>
</batch:step>

</batch:flow>
</batch:split>

The tasks “GenerateReportFrom” and “GenerateReportTo” are quite the same; the only difference is in the group by definition in Sql query.

For this reason, I show only the “From” flow.

The reader:


<bean id="mySqlItemReaderFrom" 
class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="SELECT `from`, count(*) as counter 
FROM stresstest.flight_report group by `from`;" />
<property name="rowMapper">
<bean class="it.blog.springbatch.flightradar.mapper.AirportRowMapper" />
</property>
</bean>

The writer:


<bean id="flatFileItemWriterFrom" 
class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:output/output_from_#{jobParameters['filename']}.csv" />
<property name="appendAllowed" value="true" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="from, count" />
</bean>
</property>
</bean>
</property>
</bean>

The data are extracted from the Db into an “Airport” bean by the “AirportRowMapper” class. After that the data are written to a file extracting the “from” and the “count” fields.

How to run the process?

Two options available: On demand, running the process as console application, or as scheduled process.

The first option is a classic main application:


public class ProcessFlight {

public static void main(String[] args) {

ProcessFlight obj = new ProcessFlight();
obj.run();

}

private void run() {

String[] springConfig =
{"config/database.xml" , "config/context.xml", "config/component.xml", "job/job.xml" };

ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
RunScheduler runner = context.getBean("runScheduler", RunScheduler.class);
runner.run();
}

and the ProcessFlight class:


public class RunScheduler {

@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

private static final Log logger = LogFactory.getLog(RunScheduler.class);

public void run() {

try {

Map<String, JobParameter> parameters =
new HashMap<String, JobParameter>();

File folder = new File("C:/progetti/Blog/SpringBatchFlightRadar/input/");
File[] listOfFiles = folder.listFiles();

for (File file : listOfFiles) {
if (file.isFile()) {
parameters.put("filename", new JobParameter(file.getName()));
}
}

/* In order to avoid duplicate job instance already exists */
parameters.put("executionTime", new JobParameter(System.currentTimeMillis()));

JobExecution execution = jobLauncher.run(job,
new JobParameters(parameters));

logger.info("Exit Status : " + execution.getStatus());
logger.info("Exit Status : " + execution.getAllFailureExceptions());
System.out.println(execution.getAllFailureExceptions());

} catch (Exception e) {
logger.error(e);
}
}
}

The second option is scheduling the process in the configuration file and commenting the lines


/*Only without schedulation*/
//RunScheduler runner = context.getBean("runScheduler", RunScheduler.class);
//runner.run();

Let’s run the process (your choice scheduled or not) and look at the result:

AGP,7
AMS,35
ASB,7
ATH,28
AYT,7
BCN,28
BGY,35
BIO,14
BLQ,42
BRN,7
BRU,7
BSL,14
BTS,7
CDG,49
CFU,28
CIA,7
CRL,14
CTA,14
DBV,7
EFL,7
EWR,7
FCO,21
FKB,7
FLR,21
...

The first column is the IATA code and the second is the number of flights which have taken off from that airport.

The complete solution is available at GitHub at Url https://github.com/MarcoGhise/SpringBatch.

I suggest downloading the Spring Batch source code, it could be helpful to clearly how it works.

Also, as I’ve said in the article introduction, that’s also available process monitor which helps us to keep the process status under control.

Have a little patience, I’m speaking about that in another articles.

UPDATE

I think it’s quite useless to write the same things which could create only confusion for the reader.

So, I think you can get all the information about Spring Admin you need from this article https://examples.javacodegeeks.com/enterprise-java/spring/spring-batch-admin-tutorial/ and the github code is here https://github.com/spring-projects/spring-batch-admin.

Enjoy it!

Advertisements

One thought on “Introduction to Spring Batch

  1. Pingback: 5 minutes with – Spring Integration | Tech Annotation

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s