Introduction to Apache Kafka using Spring

A frequently common problem today it’s to deal with big data that makes you adopt of different system in order to achieve the result of processing large data.

One of the product that makes it possible is Apache Kafka, and I’m going to speak about it in this post.

Apache Kafka is a Message Broker such as MessageQueue system (e.g. ActiveMQ, HornetQ, etc..) and it makes possible to persist message and distributes it to the all cluster members.
Ok, what’s new? By the all contributions and motivations you can find over internet, I think the best answer at the question “Why should I use Kafka instead of MessageQueue” is that with relative low resource it makes possible to deal with very big data, better than a common MessageQueue system.
Aren’t you still convinced? Have a look at this “PowerBy” section at Kafka website or have a look at Jay Kreps post.

Having seen that, I don’t think you need my opinion to think that’s worthwhile spending some hours to look at.

Let’s start with the sample project. I simulate a flight monitor system where, every airplane, sends the position information to a central system (Apache Kafka) who collects them in a MongoDb.
The diagram is the following:

apacheKafka

Now, Brokers and ZooKeeper are Kafka parts. The first accept the messages which come from the topics (it’s the same concept of the queues in Message Queues) and ZooKeeper orchestrates the Brokers in Kafka.

My solution includes Spring integration Kafka project available here. It helped me to configure producer and consumer by using xml configuration files.

Let’s start with the Producer (or Outbound process).
outbound
And this is the configuration file


	<int:publish-subscribe-channel id="inputToKafka" />

	<int-kafka:outbound-channel-adapter kafka-producer-context-ref="kafkaProducerContext" auto-startup="true" channel="inputToKafka" order="1">
	</int-kafka:outbound-channel-adapter>

	<bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="topic.metadata.refresh.interval.ms">3600000</prop>
				<prop key="message.send.max.retries">5</prop>
				<prop key="send.buffer.bytes">5242880</prop>
			</props>
		</property>
	</bean>

	<bean id="flightEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
		<constructor-arg value="it.demo.kafka.springkafka.bean.Flight" />
	</bean>

	<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
		<int-kafka:producer-configurations>
		
			<!-- Flight U22552 -->
			<int-kafka:producer-configuration broker-list="localhost:9092" key-class-type="java.lang.String" value-class-type="it.demo.kafka.springkafka.bean.Flight" value-encoder="flightEncoder" topic="U22552" compression-codec="none" />
				
			<!-- Flight U22780 -->
			<int-kafka:producer-configuration broker-list="localhost:9092" key-class-type="java.lang.String" value-class-type="it.demo.kafka.springkafka.bean.Flight" value-encoder="flightEncoder" topic="U22780" compression-codec="none" />
				
		</int-kafka:producer-configurations>
	</int-kafka:producer-context>

As you can see, I’ve defined two different topic based on the flight Number (“U22552” and “U22780”).
The Java code that accept the flight position is a simple REST service.

@Controller
public class ProcessController {

@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;

private static final Log logger = LogFactory.getLog(ProcessController.class);

@RequestMapping(method = RequestMethod.POST, value = "/publish")
public @ResponseBody Flight getTrainsJson(Flight flightRequest) {

logger.info("Flight: " + flightRequest);

channel.send(MessageBuilder.withPayload(flightRequest)
.setHeader("topic", flightRequest.getNumber()).build());

return flightRequest;
}
}

And this is the Flight bean.

@Document
public class Flight {

private String number;
private double latitude;
private double longitude;

public Flight(){}

public Flight(String number, double latitude, double longitude) {
this.number = number;
this.latitude = latitude;
this.longitude = longitude;
}

public String getNumber() {
return number;
}

public void setNumber(String number) {
this.number = number;
}

public double getLatitude() {
return latitude;
}

public void setLatitude(double latitude) {
this.latitude = latitude;
}

public double getLongitude() {
return longitude;
}

public void setLongitude(double longitude) {
this.longitude = longitude;
}

@Override
public String toString() {
return "number = " + number + "latitude = " + latitude + "longitude = " + longitude;
}
}

Once the /publish rest is invoked a new flight position is put into the topic.

Now the Consumer (or Inbound process).
inbound

And its relative configuration file.


	<int:channel id="inputFromKafka">
		<int:queue />
	</int:channel>

	<int:service-activator input-channel="inputFromKafka" ref="messageProcessor">
		<int:poller fixed-rate="500" />
	</int:service-activator>

	<bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="auto.offset.reset">smallest</prop>
				<prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
				<prop key="fetch.message.max.bytes">5242880</prop>
				<prop key="auto.commit.interval.ms">1000</prop>
			</props>
		</property>
	</bean>


	<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" />


	<bean id="kafkaThreadListener" class="it.demo.kafka.springkafka.listener.KafkaConsumerStarter" init-method="initIt" destroy-method="cleanUp" />

	<int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext" auto-startup="false" channel="inputFromKafka" id="kafka-inbound-channel-adapter">
		<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" />

	</int-kafka:inbound-channel-adapter>

	<!-- Consumer -->
	<bean id="flightDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
		<constructor-arg value="it.demo.kafka.springkafka.bean.Flight" />
	</bean>

	<int-kafka:consumer-context id="consumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
		<int-kafka:consumer-configurations>
			<int-kafka:consumer-configuration group-id="default" max-messages="50" value-decoder="flightDecoder">
				<int-kafka:topic id="U22552" streams="1" />
				<int-kafka:topic id="U22780" streams="1" />
			</int-kafka:consumer-configuration>
		</int-kafka:consumer-configurations>
	</int-kafka:consumer-context>

The Java code used by the Consumer process it’s a class that implements ApplicationContextAware interface. The code starts when the application run too.

public class KafkaConsumerStarter implements ApplicationContextAware
{
private ApplicationContext appContext;

private SourcePollingChannelAdapter kafkaInboundChannelAdapter;

private KafkaConsumerContext kafkaConsumerContext;

public void initIt() throws Exception
{
kafkaInboundChannelAdapter =
appContext.getBean("kafka-inbound-channel-adapter", SourcePollingChannelAdapter.class);
kafkaInboundChannelAdapter.start();

kafkaConsumerContext =
appContext.getBean("consumerContext", KafkaConsumerContext.class);
}
public void cleanUp() throws Exception
{
if (kafkaInboundChannelAdapter != null)
{
kafkaInboundChannelAdapter.stop();
}

Thread.sleep(1000);

Metrics.defaultRegistry().shutdown();
}
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException
{
this.appContext = applicationContext;
}
}

Every 0.5 seconds, the Consumer tries to get new messages from the topic. The bean MessageProcessor processes the messages.

@Component
public class MessageProcessor {

@Autowired
MongoDao dao;

@SuppressWarnings("unchecked")
@ServiceActivator
public <K, V> void processPayment(Map<K, V> payload) {
String key = null;
for(K item : payload.keySet()) {
key = (String) item;
}

Map<K, V> topic = (Map<K, V>)payload.get(key);
List order = (List)topic.get(0);
order.forEach((v) -> dao.insertFlight(v));}
}

The Mango Bean is the Dao connector with the MongoDB.

public class MongoDao {

private MongoTemplate mongoTemplate;

private static final String FLIGHTS = "flights";

public void insertFlight(Flight flight) {

if (!getMongoTemplate().collectionExists(FLIGHTS)) {
getMongoTemplate().createCollection(FLIGHTS);
}

getMongoTemplate().insert(flight, FLIGHTS);
}

public MongoTemplate getMongoTemplate() {
return mongoTemplate;
}

public void setMongoTemplate(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
}

The MangoDb configuration file:

	<mongo:db-factory id="mongoDbFactory" host="localhost" port="27017" dbname="kafkaflights"/>
    
    <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
		<constructor-arg name="mongoDbFactory" ref="mongoDbFactory" />
	</bean>

    <mongo:repositories base-package="it.demo.kafka.springkafka.repo" mongo-template-ref="mongoTemplate" /> 
	<bean class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor" />
  
	<bean id="dao" class="it.demo.kafka.springkafka.dao.MongoDao">
		<property name="mongoTemplate" ref="mongoTemplate" />
	</bean>

Before running the code it’s needful installing and running the Kafka Broker.
Once downloaded and unzipped the binary file from here we need to complete two others steps.

The first is running the ZooKeeper process by typing (I’m using Windows):
bin\windows\zookeeper-server-start.bat config/zookeeper.properties

The second step is running the Broker:
bin\windows\kafka-server-start.bat config/server.properties

I’ve used the default configurations with only one broker and not clustered. More info are available at the official documentation.

Finally, the libraries versions that I’ve used for this example.

  • Spring version 4.1.0.RELEASE;
  • Spring integration Kafka version 1.0.0.M2;

The complete solution is available at https://github.com/MarcoGhise/SpringKafka

Advertisements

31 thoughts on “Introduction to Apache Kafka using Spring

  1. Pingback: Introduction to Apache Kafka using Spring – sendilsadasivam

  2. Hi, thank you for this article. I downloaded the solution and import it to eclipse. In MessageProcessor.java line 39, below error is shown:

    Multiple markers at this line
    – v cannot be resolved to a variable
    – Syntax error on token “-“, — expected
    – v cannot be resolved to a variable

    Also, please help to explain “->” syntax in java. Eclipse see this as syntax error. Haven’t seen this before, only in Kafka-spring projects I donwload. Thank you and have a good day.

    • Hi.
      Upgrading your Jdk version at 8 you’ll get the correct line interpretation.
      What you’re watching are named “Lambda expression” introduced in the last Jdk version.

      Enjoy it

  3. Hi,

    While running the given code, I got the below error :

    cvc-complex-type.2.4.c: The matching wildcard is strict, but no declaration can be found for element ‘int:channel’.

    The below line of configuration xml is causing the problem :

    Do you have any idea about the probable resolution of the problem ?

    Thanks

    • Hi Dinesh.
      Very strange…can you please check the dependency on your maven repository?
      You should have the spring-integration-kafka-1.0.0.M2.jar dependency under the folder \org\springframework\integration\spring-integration-kafka\1.0.0.M2.
      Is it correctly downloaded?

    • Hi.
      When did you get that error? Are you running Zookeper and the Broker? Did you create the topic? Let me know more information, I’m not able to understand the problem.

    • Hi.
      Sometimes I got the error about the kafkaInboundChannelAdapter which hadn’t been stopped before the system shoutdown.
      I found the thread’s sleep as a workaround of the problem.

    • Hi Vanita.
      Follow these instruction:
      – Start ZooKeeper and Broker as described at the bottom of the post;
      – Run MongoDb at single instance (look at kafka-dao.xml for the configuration parameters)
      – Deploy the solution as WebApplication and then browse to http://localhost:8080/SpringKafka/insertflight.html;
      – Fill the form with Flight Number either U22552 or U22780 and a long values for lat and long, then press “Registra”.
      – Have a look at Mongo kafkaflights Db and flights collection to check your flight correctly inserted.
      Hope this can help you.

      I’ve just checked my source code and, unfortunately, I used that for test other solutions, so, it could seem a little bit confused.
      I’m sorry for that.

  4. Hi,
    while running consumer on multiple multiple machines ,only a single consumer recieves messages and not the others.
    There is a single topic with multiple partitions.group.id is same for all the consumer instances.
    How to subscribe consumer to a group?

  5. hi,
    is it necessary to specify the topic in producer configuration.I am able to send data to different topics without specifying topic name in producer configuration

  6. Hi,
    I am facing the following issue ,destroy method is never called.and even if i undeploy the consumer it continues to take messages

    • Hi Philanthropist.
      I’ve just reproduced the use case using the Junit test Inbound and Outbound under src/test.

      The steps are:
      1. Start Zookeeper server;
      2. Start Kafka broker;
      3. Start MongoDB;
      4. Start Outbound Junit test; it sends 100,000 messages to Kafka server for topic U22552.
      5. Start the Inbound Junit test; check the MongoDb collection (or add a log under insertFlight class) and you can see the messages.

      I don’t use the Kafka-template property to configure the adapter. Honestly, I’m not sure that it could be the problem.

    • Hi.
      Unfortunately your example is relative of Kafka Spring integration version 2.
      I haven’t still got the time to migrate my example at the new version. I guess core part has been changed. I’m sorry but I can’t help you soon.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s