Spring Jms – Produce and consume messages

Next step from my previous post (Jms – Produce and consume messages) is to use Spring Framework to configure Jms components. I’m going to show you two differents ways to consume messages from Jms Broker. One is synchronous and other is asynchronous way. First take a look at spring official web site (Chapter 21 – JMS) As usual, we have got a Producer and a Consumer. The first write into the Queue and the second read from it. The jars included are:

commons-logging-1.1.1.jar
fscontext.jar
imq.jar
jms.jar
spring-aop-3.1.0.RELEASE.jar
spring-asm-3.1.0.RELEASE.jar
spring-beans-3.1.0.RELEASE.jar
spring-context-3.1.0.RELEASE.jar
spring-core-3.1.0.RELEASE.jar
spring-expression-3.1.0.RELEASE.jar
spring-jdbc-3.1.0.RELEASE.jar
spring-jms-3.1.0.RELEASE.jar
spring-orm-3.1.0.RELEASE.jar
spring-tx-3.1.0.RELEASE.jar
spring-web-3.1.0.RELEASE.jar
spring-webmvc-3.1.0.RELEASE.jar

The most important file at the project is spring’s configuration file. Take a look at it.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/tx
	http://www.springframework.org/schema/tx/spring-tx.xsd
	http://www.springframework.org/schema/context
	http://www.springframework.org/schema/context/spring-context.xsd
	http://www.springframework.org/schema/jms
	http://www.springframework.org/schema/jms/spring-jms.xsd">

	<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
		<property name="environment">
			<props>
				<prop key="java.naming.provider.url">file:///store1/MessageQueue/var/mq/admobjstore</prop>
				<prop key="java.naming.factory.initial">com.sun.jndi.fscontext.RefFSContextFactory</prop>
			</props>
		</property>
	</bean>

	<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiTemplate" ref ="jndiTemplate"/>
		<property name="jndiName" value="ConnectionFactory"/>
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory"/>
		<property name="defaultDestination" ref="destination"/>
		<property name="pubSubDomain" value="true"/>
		<property name="deliveryPersistent" value="true"/>
		<property name="deliveryMode" value="2"/>
	</bean>

	<bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">
		<property name="jndiTemplate" ref="jndiTemplate"/>
		<property name="jndiName" value="OrderQueue"/>
	</bean>

	<bean id="orderSender" class="it.samplejms.spring.OrderSender">
		<property name="jmsTemplate" ref="jmsTemplate"/>
	</bean>

<!-- Listener Asynchronous
	<bean id="orderListener" class="it.samplejms.spring.OrderListener">
		<property name="errorHandler" ref="someHandler"/>
	</bean>

	<jms:listener-container concurrency="5-10">
    	         <jms:listener destination="OrderQueue" ref="orderListener"/>
  	</jms:listener-container>
-->
<!-- Receive Synchronous -->
	<bean id="orderReceiver" class="it.samplejms.spring.OrderReceiver">
		<property name="jmsTemplate" ref="jmsTemplate"/>
	</bean>

</beans>
package it.matrix.samplejms.spring;

import org.springframework.stereotype.Service;
import org.springframework.util.ErrorHandler;

@Service
public class SomeHandler implements ErrorHandler {

    @Override
    public void handleError(Throwable t) {
        System.out.println("Error in listener:" + t);
    }
}

Looking inside the configuration file we can inspect some points: connectionFactory: Connection with broker. jmsTemplate: This is the core of the package Jms and it will be used for comunication between the broker and the producer/consumer client. destination: This is the destination Queue (or Topic). orderListener: Consumer in asynchronous way. orderReceiver: Consumer in synchronous way. The Producer class is the follow:


package it.samplejms.spring;

import it.samplejms.bean.Order;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class OrderSender {
 private JmsTemplate jmsTemplate;

 public void sendMessages(Order order) throws JMSException {

  final Order orderPayload = order;

  getJmsTemplate().send(new MessageCreator() {

   public Message createMessage(Session session) throws JMSException {

   Message message = session.createObjectMessage(orderPayload);

   return message;
   }
  });
 }

 public JmsTemplate getJmsTemplate() {
  return jmsTemplate;
 }

 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }

}

The main method is the send method of jmsTemplate object. In synchronous the method jmsTemplate.receive() is used for reading the messages from queue.


package it.samplejms.spring;

import it.samplejms.bean.Order;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;

import org.springframework.jms.core.JmsTemplate;

public class OrderReceiver {
 private JmsTemplate jmsTemplate;

 public JmsTemplate getJmsTemplate() {
  return jmsTemplate;
 }

 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }

 public Order Receive() throws JMSException {
  Message message = jmsTemplate.receive();
  Order order = (Order)((ObjectMessage) message).getObject();
  return order;
    }
}

Keep in mind that “During a synchronous receive, the calling thread blocks until a message becomes available” (from Spring official Guide). More interested way is asynchronous mode for consuming messages. In the Spring configuration file we have to uncomment orderListener and listener-container part. The driver class orderListener implements the interface javax.jms.MessageListener. The method onMessage


public void onMessage(Message message);

will be called every time a message will be read from the queue.


package it.samplejms.spring;

import it.samplejms.bean.Order;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;

public class OrderListener extends org.springframework.jms.listener.DefaultMessageListenerContainer implements javax.jms.MessageListener {
 @Override
 public void onMessage(Message message) {
  try {
   Order order = (Order)((ObjectMessage) message).getObject();
   System.out.println("Order - " + order.toString());
  } catch (JMSException e1) {
   e1.printStackTrace();
  }

 }

}

The last code is about the Producer and Consumer.


package it.samplejms;

import java.math.BigDecimal;
import java.util.Date;

import javax.jms.JMSException;

import it.samplejms.bean.Order;
import it.samplejms.spring.OrderReceiver;
import it.samplejms.spring.OrderSender;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerProducerSpring {

 /**
  * @param args
  * @throws JMSException
  */
 public static void main(String[] args) throws JMSException {

  Order order = new Order(123, new BigDecimal(2300), 120, new Date());

  ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring-samplejms.xml"});
  OrderSender sender = (OrderSender) context.getBean("orderSender");
  sender.sendMessages(order);

  OrderReceiver receiver = (OrderReceiver) context.getBean("orderReceiver");
  System.out.println("Read Message: " + receiver.Receive().toString());
 }
}

That’s all. You have only to take your choice about using receiver in asynchronous or synchronous mode. I think it worth spend some time to practise this integration.

Advertisements

21 thoughts on “Spring Jms – Produce and consume messages

  1. When I’m running your sample, it’s hitting an exception :

    org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘destination’ defined in ServletContext resource [/WEB-INF/applicationContext.xml]: Invocation of init method failed; nested exception is javax.naming.NameNotFoundException: OrderQueue

    What does “OrderQueue” refer to anyway ?

  2. Hi,
    Is there a way I can add error handler to this? I have created an asynchronous listener using the concept in this blog.
    Thanks!

    • Even in synchronous and in asynchronous way you can catch the JMSException on the receiver(), producer() and onMessage() method.
      You can handle these Exceptions to control your code.
      Were you asking that or have I misunderstood the question?

      • Hi,
        I get these messages on certain conditions which may be an error on the other side; “execution of jms message listener failed and no errorhandler has been set”. I am catching JMSException in the asynchronous listener (the onMessage() method). However it does not let me catch the JMSException during the Consumer (where i initialize the jndiTemplate bean –> org.springframework.jndi.JndiTemplate). I throw it from that method and I believe it is one of the thrown exceptions. I searched and i told me add an ErrorHandler but “org.springframework.jms.core.JmsTemplate” or “org.springframework.jndi.JndiTemplate” does not have a way to specify an ErrorHandler.

      • Hi,
        I found out this answer about the message “execution of jms message listener failed and no errorhandler has been set”.
        According to this post http://stackoverflow.com/questions/8922532/execution-of-jms-message-listener-failed-and-no-errorhandler-has-been-set, I’ve edited my code updating my Listener class with
        extends org.springframework.jms.listener.DefaultMessageListenerContainer.
        The spring configuration file looks like this:

        and added the class “someHandler”. Anyway, I can’t reproduce your exception. I’m using Sun Message Queue, maybe it depends by different Message Queue broker.
        At least, I can recommend to have a look at Xavier Pedro’s Blog. In this post http://xpadro.blogspot.it/2013/08/spring-jms-processing-messages-within.html he described transaction Message Queue (you can find the your exception at the end of the post).
        I hope this could help you.

      • Hi,

        Thanks a lot for the help. However I cannot see your new spring configuration file:

        “I’ve edited my code updating my Listener class with
        extends org.springframework.jms.listener.DefaultMessageListenerContainer.
        The spring configuration file looks like this:

        and added the class “someHandler”. Anyway, I can’t reproduce your exception. I’m using Sun Message ”

        Can you copy/paste it again?

        Thanks,
        A.

  3. Hi Techannotation,

    Thanks for your previous help. It works fine with a bit of troubleshooting. I am not implementing it for a SSL enabled and a Hornet Q server with clustering. The SSL part looks easy as I can set the environment variables in the ‘org.springframework.jndi.JndiTemplate’ bean. Do you have any idea about clustering?

    Thanks in advance. Really appreciate it!

    • Hi Ann,
      I’m sorry but I’ve never used HornetQ and I can’t give you any advice about it. As I said, I used MessageQueue and I wrote an article about its configuration on cluster (Sun MessageQueue – Install and Run).
      MessageQueue used a broker to dispatch the message and it can be use in cluster enviroment. Good luck with your research.

  4. Hi Techannotation,

    I tried using the same code to connect to JMS topics but doesnt work. However you specify here that the same code should work. Is there any tweaking which has to be done?

    Thanks,
    Ann

  5. Thanks for the code! This really helped me. I am trying to make an executable jar out of the same and I get the following error:

    Exception in thread “main” org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/jms]
    INFO: Loading XML bean definitions from class path resource [spring-app.xml]
    Exception in thread “main” org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Unable to locate Spring NamespaceHandler for XML schema namespace [http://www.springframework.org/schema/jms]
    Offending resource: class path resource [spring-app.xml]

    Any idea why this can occur? I followed everything step by step

  6. Hello.This post was extremely motivating, especially because I was browsing for thoughts on this subject last Thursday. cgbdeeadaeea

  7. Excellent Post!!! However I am struggling to reconnect to the JMS server when it is rebooted/ any idea on how that can be done? There is also a live backup configured for the JMS server and I want JNDI to connect to that whenever the primary server drops off.
    Help will be greatly appreciated!
    Sandy

    • Hi Sandy,
      Unfortunately, I’ve never used a configuration like “master-slave” as you’re asking me. I use a broker cluster; if a node falls the other node takes the message not accepted by the primary node. I’m sorry, I’m not able to answer of your second question.

      Speaking about the first question, I think you shouldn’t have any problems for send/produce a message to a broker that has been rebooted. Every time the code get a connection from the connection pool, so, if the pool is empty after the restart, it’ll be created again.
      Have you tried it?
      Instead, For receive message I’ve some doubt for synchronous receiver.

      I recommend you to read http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html.

      • Do you have a sample for the broker cluster server architecture? Will the asynchronous receiver change in that anyway? I am not sure if it should but probably any sample can help me understand that in which direction i should go.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s