Spring distributed transaction with Atomikos and ActiveMQ

How can we put together different resource in a unique and atomic transaction? We’re speaking about Jta and distributed transactions, not only with Dao resources but also including a Jms resource.

It’s very frequently, nowadays, to have to deal with more than one resource (a single Database) in a single process. For example, a booking process includes different steps which involve different resources (make the reservation, pay the bill in advance, send the invoice, etc…). These resources could be one or more databases and, why not, a messaging service.

I’d like to describe how it works in this example.

The components involved in the solution are:

  • ActiveMQ (I used version 5.11.1);
  • Derby Embedded Database;
  • Atomikos (library version 3.9.0);
  • Spring MVC (version 3.27);

I’m going to implement a solution for a retail shop. The picture below shows the process flow:

atomikos_ok

So, the customer makes an order and the operator inserts in into a Database; after inserting the order, the Warehouse is updated and the order is ready for being delivered.

That’s when everything goes well, but, as we well know, we don’t live in a perfect world, so that’s what could happen:

atomikos

Despite we have done a good job, something might goes wrong during the delivery process (e.g. payment service out of order) and we need to rollback the entire process. We’ll do it in a unique, atomic, transaction.

First of all, let’s get a view at ActiveMQ installation.

Once downloaded the version compliant with your platform (http://activemq.apache.org/activemq-5111-release.html) I’ve added a Jaas Authentication Plugin in order to implement the authentication layer.

In the file conf/activemq.xml add the line:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
...
        <plugins>
          <!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
          <jaasAuthenticationPlugin con<span style="color: #000000;">JTA/XA-enabled JMS </span>ion="activemq" />

          <!-- lets configure a destination based authorization mechanism -->
          <authorizationPlugin>


<map>
              <authorizationMap>
                <authorizationEntries>
                  <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                  <authorizationEntry queue="ORDER_CREATED" read="users" write="users" admin="users" />

                  <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                  <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users"
write="guests,users" admin="guests,users"/>

                </authorizationEntries>

              </authorizationMap>
            </map>


          </authorizationPlugin>

        </plugins>
...

You can get more information about this configuration at this page (http://activemq.apache.org/security.html).

Pay attention at the configuration=”activemq”; the name specified in the configuration must match the configuration name in the file conf/login.config.

Adding a user that will consume, read and administrate the queue named ORDER_CREATED by editing the files conf/users.properties:

employ123=password123

and adding it into the conf/groups.properties file:

users=employ123

Just run it at the command line (bin/activemq start) and browse the administration console http://localhost:8161/admin/ (user: admin; password: admin).

Once created the Queue “ORDER_CREATED” you can see something like this screenshot

activemq1

Well, that’s all for ActiveMQ.

The other part of the solution is Atomikos (http://www.atomikos.com/Main/TransactionsEssentials).

Atomikos is a library that allows distributed transactions through Jta implementation. I think it’s very easy to use and to configure and it’s a valid solution to get a distributed transactions outside an Java EE application server (JBoss, GlassFish, etc..)

I used it defining a Transaction Manager inside a spring configuration file as a bean.

<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce" />

<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" depends-on="userTransactionService" />

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
     <property name="transactionManager" ref="atomikosTransactionManager" />
     <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

And the Data sources:

<bean id="ordersDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
	<qualifier value="atomikosDataSourceOrder" />
	<property name="uniqueResourceName" value="xaOrders" />
	<property name="xaDataSourceClassName" value="org.apache.derby.jdbc.EmbeddedXADataSource" />
	<property name="xaProperties">
		<props>
			<prop key="databaseName">C:/progetti/Blog/SpringAtomikos/db/orders</prop>
			<prop key="createDatabase">create</prop>
		</props>
	</property>
	<property name="poolSize" value="1" />
</bean>
...
<bean id="wareHouseDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
	<qualifier value="atomikosDataSourceWareHouse" />
	<property name="uniqueResourceName" value="xaWareHouse" />
	<property name="xaDataSourceClassName" value="org.apache.derby.jdbc.EmbeddedXADataSource" />
	<property name="xaProperties">
		<props>
			<prop key="databaseName">C:/progetti/Blog/SpringAtomikos/db/warehouse</prop>
			<prop key="createDatabase">create</prop>
		</props>
	</property>
	<property name="poolSize" value="1" />
</bean>

As I said before, I used a Derby Db, it’s enough to change the driver according to the Dao providers which provide their XA DataSource driver.
Having said that, even Jms has its XA driver as visible in the snippet below.

<!-- ActiveMQ connection factory -->
<!-- JTA/XA-enabled JMS -->
<bean id="atomikosJmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
	<property name="uniqueResourceName" value="ActiveMQXA" />
	<property name="xaConnectionFactory">
		<bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
			<property name="brokerURL" value="tcp://127.0.0.1:61616" />
			<property name="userName" value="employ123"/>
                	<property name="password" value="password123"/>
		</bean>
	</property>
	<property name="poolSize" value="1" />
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
	<property name="connectionFactory" ref="atomikosJmsConnectionFactory" />
	<property name="receiveTimeout" value="2000" />
	<property name="sessionTransacted" value="true" />
	<property name="sessionAcknowledgeMode" value="0" />
</bean>

The user name and password are the same of the user created before as “Application User”.

The complete Configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

	<!-- JPAAccountDAO has JPA annotations to access EntityManager -->
	<context:annotation-config />

	<bean id="ordersDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
		<qualifier value="atomikosDataSourceOrder" />
		<property name="uniqueResourceName" value="xaOrders" />
		<property name="xaDataSourceClassName" value="org.apache.derby.jdbc.EmbeddedXADataSource" />
		<property name="xaProperties">
			<props>
				<prop key="databaseName">C:/progetti/Blog/SpringAtomikos/db/orders</prop>
				<prop key="createDatabase">create</prop>
			</props>
		</property>
		<property name="poolSize" value="1" />
	</bean>

	<bean id="ordersEntityManager" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
		<property name="dataSource" ref="ordersDataSource" />
		<property name="persistenceUnitName" value="Orders" />
	</bean>

	<bean id="wareHouseDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
		<qualifier value="atomikosDataSourceWareHouse" />
		<property name="uniqueResourceName" value="xaWareHouse" />
		<property name="xaDataSourceClassName" value="org.apache.derby.jdbc.EmbeddedXADataSource" />
		<property name="xaProperties">
			<props>
				<prop key="databaseName">C:/progetti/Blog/SpringAtomikos/db/warehouse</prop>
				<prop key="createDatabase">create</prop>
			</props>
		</property>
		<property name="poolSize" value="1" />
	</bean>

	<bean id="wareHouseEntityManager" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
		<property name="dataSource" ref="wareHouseDataSource" />
		<property name="persistenceUnitName" value="WareHouse" />
	</bean>

	<bean id="ordersController" class="it.demo.springatomikos.controller.OrdersController" />
	<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
		<property name="transactionManager" ref="atomikosTransactionManager" />
		<property name="userTransaction" ref="atomikosUserTransaction" />
	</bean>

	<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce" />

	<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" depends-on="userTransactionService" />

	<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
		<property name="transactionTimeout" value="300" />
	</bean>

	<!-- Reader in base al BeanName (collegato alla chiamata URL) -->
	<bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />

	<bean id="orderViewJson" class="org.springframework.web.servlet.view.json.MappingJacksonJsonView" />

	<bean id="ordersDao" class="it.demo.springatomikos.dao.OrdersDao" />
	<bean id="wareHouseDao" class="it.demo.springatomikos.dao.WareHouseDao" />

	<bean id="store" class="it.demo.springatomikos.component.Store" scope="singleton">
		<property name="ordersDao" ref="ordersDao" />
		<property name="wareHouseDao" ref="wareHouseDao" />
	</bean>
	<tx:annotation-driven transaction-manager="transactionManager" />

	<!-- ActiveMQ connection factory -->
	<!-- JTA/XA-enabled JMS -->
	<bean id="atomikosJmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
		<property name="uniqueResourceName" value="ActiveMQXA" />
		<property name="xaConnectionFactory">
			<bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
				<property name="brokerURL" value="tcp://127.0.0.1:61616" />
				<property name="userName" value="employ123"/>
            	<property name="password" value="password123"/>
			</bean>
		</property>
		<property name="poolSize" value="1" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="atomikosJmsConnectionFactory" />
		<property name="receiveTimeout" value="2000" />
		<property name="sessionTransacted" value="true" />
		<property name="sessionAcknowledgeMode" value="0" />
	</bean>
</beans>

In the persistence properties file are configured the two units:

<persistence version="2.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">

	<persistence-unit name="Orders" transaction-type="JTA">
		<provider>org.hibernate.ejb.HibernatePersistence</provider>
		<class>it.demo.springatomikos.bean.Order</class>
		<exclude-unlisted-classes>true</exclude-unlisted-classes>
		<properties>
			<property name="hibernate.transaction.manager_lookup_class" value="com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup" />
			<property name="hibernate.show_sql" value="true" />
			<property name="hibernate.format_sql" value="true" />
			<property name="hibernate.hbm2ddl.auto" value="create" />
		</properties>
	</persistence-unit>

	<persistence-unit name="WareHouse" transaction-type="JTA">
		<provider>org.hibernate.ejb.HibernatePersistence</provider>
		<class>it.demo.springatomikos.bean.WareHouse</class>
		<exclude-unlisted-classes>true</exclude-unlisted-classes>
		<properties>
			<property name="hibernate.transaction.manager_lookup_class" value="com.atomikos.icatch.jta.hibernate3.TransactionManagerLookup" />
			<property name="hibernate.show_sql" value="true" />
			<property name="hibernate.format_sql" value="true" />
			<property name="hibernate.hbm2ddl.auto" value="create" />
		</properties>
	</persistence-unit>
</persistence>

And the two entities defined in Java:

@Entity
@Table(name = "ORDERS")
public class Order implements Serializable {

 private Long id;
 private String product;
 private int quantity;

 @Id
 @GeneratedValue
 @Column(name = "ID")
 public Long getId() {
	return id;
 }
 public void setId(Long id) {
	this.id = id;
 }
 @Column(name = "PRODUCT")
 public String getProduct() {
	return product;
 }
 public void setProduct(String product) {
 	this.product = product;
 }
 @Column(name = "QUANTITY")
 public int getQuantity() {
 	return quantity;
 }
 public void setQuantity(int quantity) {
 	this.quantity = quantity;
 }
 @Override
 public String toString()
 {
 	return "id:" + id + "\n" +
 			"product:" + product + "\n" +
 			"quantity:" + quantity;
 }
}

And the other entity:

@Entity
@Table(name = "WAREHOUSE")
@NamedQueries({
 @javax.persistence.NamedQuery(
 name="emptyWareHouse",
 query="DELETE FROM WareHouse"
 )
})
public class WareHouse {

 private String product;
 private int quantity;
 @Id
 @Column(name = "PRODUCT")
 public String getProduct() {
 	return product;
 }
 public void setProduct(String product) {
 	this.product = product;
 }
 @Column(name = "QUANTITY")
 public int getQuantity() {
 	return quantity;
 }
 public void setQuantity(int quantity) {
 	this.quantity = quantity;
 }
}

The Dao controllers are OrdersDao

public class OrdersDao{

	@PersistenceContext(unitName="ordersEntityManager")
	private EntityManager em;
	
	@Transactional(readOnly = true)
	public void insertOrder(Order order) {
		em.persist(order);
	}	
}

and WareHouseDao

public class WareHouseDao{

	@PersistenceContext(unitName="wareHouseEntityManager")
	private EntityManager em;
	
	@Transactional(readOnly = true)
	public WareHouse getOrderWareHouse(String product) {
		return em.find(WareHouse.class, product);
	}
	
	@Transactional(readOnly = true)
	public void emptyWareHouse()
	{
		Query query = em.createNamedQuery("emptyWareHouse");     
		query.executeUpdate(); 
	}
	
	@Transactional(readOnly = true)
	public void populateWareHouse(WareHouse wareHouse) {
		em.persist(wareHouse);
		
	}

	public void updateWareHouse(WareHouse wareHouse) {
		em.merge(wareHouse);		
	}
}

The core of the process is the class named Store with its method purchaseProduct().

public class Store {

private OrdersDao ordersDao;
private WareHouseDao wareHouseDao;

@Autowired
private JmsTemplate jmsTemplate;
private final boolean getError = false;

public OrdersDao getOrdersDao() {
return ordersDao;
}
public void setOrdersDao(OrdersDao ordersDao) {
this.ordersDao = ordersDao;
}
public WareHouseDao getWareHouseDao() {
return wareHouseDao;
}
public void setWareHouseDao(WareHouseDao wareHouseDao) {
this.wareHouseDao = wareHouseDao;
}
public void emptyWareHouse() {
wareHouseDao.emptyWareHouse();
}
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void populateWareHouse() {
WareHouse whArticle1 = new WareHouse();
whArticle1.setProduct("CZ-123");
whArticle1.setQuantity(5);

wareHouseDao.populateWareHouse(whArticle1);

WareHouse whArticle2 = new WareHouse();
whArticle2.setProduct("AH-987");
whArticle2.setQuantity(10);

wareHouseDao.populateWareHouse(whArticle2);
}
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public Order purchaseProduct(Order order) throws Exception {
// Insert Order
ordersDao.insertOrder(order);
// Read the product from WareHouse
WareHouse wareHouseProduct = wareHouseDao.getOrderWareHouse(order
.getProduct());

// Remove the quantity from WareHouse
wareHouseProduct.setQuantity(wareHouseProduct.getQuantity()
- order.getQuantity());

// Update Warehouse
wareHouseDao.updateWareHouse(wareHouseProduct);

jmsTemplate.convertAndSend("ORDER_CREATED", order);

if (getError)
throw new Exception("WareHouse Exception!!");
else
return order;
}
}

The method does three steps:

  • Insert the order;
  • Update the data Warehouse;
  • Send Sms message for the delivery;

Have a look at the definition @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class).
It means that all operation are in transaction and, well, the method exception generates the complete rollback of all the actions so far executed.

As you can see, this is a very easy sample and it has been kept easy for the purpose of illustrate the technologies. Atomikos and ActiveMQ have a lot of different configurations to catch all the different solution needs.

A special thanks to Niels Peter Strandberg who wrote this amazing sample (that inspired me) https://github.com/nielspeter/atomikos-jta-xadisk-jpa-jms-example/

Advertisements

3 thoughts on “Spring distributed transaction with Atomikos and ActiveMQ

  1. Pingback: Spring distributed transaction with Atomikos and ActiveMQ | Dinesh Ram Kali.

    • Thank you Chris. Yes, very trivial code, I used getError to switch the rollback mode during my test.
      Good to know that people who read these articles are very careful!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s