Log processor with Spring Cloud Stream

Spring Cloud Stream improves the opportunities to distribute the task computation into different micro services using a message driven methodology.

Using a pattern based on source, process and sink, we’re able to build pipeline for resolving common issue as log processor. This is what I’m going to do in this article.

Starting from the official project page we can have an overview of what Cloud Stream means.

Basically, the pipeline looks like something as:

 

Where the blocks are defined as:

  • Source: Signed with the annotation @EnableBinding(org.springframework.cloud.stream.messaging.Source.class); this is the entry point of the pipeline.
  • Process: Signed with the annotation @EnableBinding(org.springframework.cloud.stream.messaging.Processor); it’s where the business logic of the process is applied .
  • Sink: Signed with the annotation @EnableBinding(org.springframework.cloud.stream.messaging.Sink); this is the point where the data from the previous block is consumed and, typicalli, persistented in some storage.

How are the blocks held together in the pipeline? What’s the glue that lets it possible?

Well, the pipeline is bound adopting a message broker which “binder” is available for Spring Cloud Stream. I used Apache Kafka as message broker.

Going back to the example, we want to process an Apache log access file (acces_log) and have a real time count of the url accessed.

The first task to complete is build the source which aim is to read the file, as a stream, from one directory.

Have a look at official Spring Cloud Stream repository (https://github.com/spring-cloud/spring-cloud-stream-modules), a huge number of modules are available for the different type of blocks.

A good summary of the available components could be found in the table below

I’ve used the file-source module as “Source” of project, configuring these parameters:


#Directory source file
file.directory=/filelogkafka-source/input/

#Polling delay
trigger.fixed-delay=20

#Unit time for polling delay
trigger.time-unit=SECONDS

#Split the file for lines
file.consumer.mode=lines

Every 20 seconds the directory /filelogkafka-source/input/ is polled looking for new file to process.

Every line generates a new message which will be broadcasted by Apache Kafka. I’ll speak of it later.

The source code of this service is the same posted on the file source repository(https://github.com/spring-cloud/spring-cloud-stream-modules/tree/master/file-source/src/main/java/org/springframework/cloud/stream/module/file/source). For this reason, I omit to copy them.

Next step is to build the process node; in this service I parse the line into an ApacheLog bean.


public class ApacheLog {

  private String ip;
  private String date;
  private String method;
  private String url;
  private String protocol;
  private String httpCode;
  ...
}

The class to parse the log file:


public class RegExUtil {

	private final static String REGEX = 
"^(\\S+)\\s-\\s-\\s\\[(\\S+)\\s\\+\\d+\\]\\s\"(GET|POST)\\s(\\S+)\\s(\\S+)\"\\s(\\d+)\\s\\d+$";

	private Pattern r;
	
	public RegExUtil()
	{
		// Create a Pattern object		
		r = Pattern.compile(REGEX);
	}
	
	public ApacheLog getLog(String logLine) {
		
		// Now create matcher object.
		Matcher m = r.matcher(logLine);
		ApacheLog log = new ApacheLog();

		if (m.find()) {

			log.setIp(m.group(1));
			log.setDate(m.group(2));
			log.setMethod(m.group(3));
			log.setUrl(m.group(4));
			log.setProtocol(m.group(5));
			log.setHttpCode(m.group(6));
		}

		return log;
	}
}

And, finally, the class signed with the annotation @EnableBinding.


@EnableBinding(Processor.class)
@SpringBootApplication
@Import(RegExUtil.class)
public class TimeProcessorApplication {

	private static Logger logger = 
                LoggerFactory.getLogger(TimeProcessorApplication.class);
	
	@Autowired
	RegExUtil regExUtil;
	
	@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
	public ApacheLog transformApacheLog(String logLine) {

		logger.info("Log line: " + logLine);
		
		return regExUtil.getLog(logLine);
	}

	public static void main(String[] args) {
		SpringApplication.run(TimeProcessorApplication.class, args);
	}
}

Once the message has been parsed the last service activated is the aggregator counter sink.
Again, I used the code from the stream module repository (https://github.com/spring-cloud/spring-cloud-stream-modules/tree/master/field-value-counter-sink/src/main/java/org/springframework/cloud/stream/module/metrics) updating the class FieldValueCounterSinkConfiguration in order to get the report output on video.

The source code with the new lines highlighted.


@EnableBinding(Sink.class)
@Import(FieldValueCounterSinkStoreConfiguration.class)
public class FieldValueCounterSinkConfiguration {

	private static Logger logger = 
LoggerFactory.getLogger(FieldValueCounterSinkConfiguration.class);
	
	//private static int count =0;
	@Autowired
	private FieldValueCounterSinkProperties fvcSinkProperties;

	@Autowired
	private FieldValueCounterWriter fieldValueCounterWriter;

	@Autowired
	private FieldValueCounterReader fieldValueCounterReader;
	
	private final JsonToTupleTransformer jsonToTupleTransformer = 
new JsonToTupleTransformer();

	@ServiceActivator(inputChannel=Sink.INPUT)
	public void process(Message<?> message) {
		Object payload = message.getPayload();
		if (payload instanceof String) {
			try {
				payload = jsonToTupleTransformer.transformPayload(payload.toString());
			}
			catch (Exception e) {
				throw new MessageTransformationException(message, e.getMessage(), e);
			}
		}
		if (payload instanceof Tuple) {
			processTuple(computeMetricName(message), (Tuple) payload);
		}
		else {
			processPojo(computeMetricName(message), payload);
		}
	}

	private void processPojo(String counterName, Object payload) {
		BeanWrapper beanWrapper = new BeanWrapperImpl(payload);
		if (beanWrapper.isReadableProperty(fvcSinkProperties.getFieldName())) {
			Object value = 
beanWrapper.getPropertyValue(fvcSinkProperties.getFieldName());
			processValue(counterName, value);
		}
	}

	private void processTuple(String counterName, Tuple tuple) {
		String[] path = 
StringUtils.tokenizeToStringArray(fvcSinkProperties.getFieldName(), ".");
		processValueForCounter(counterName, tuple, path);
	}

	private void processValueForCounter
(String counterName, Object value, String[] path) {
		String key = path[0];
		Object result = null;
		if (value instanceof List) {
			for (Object item : (List<?>) value) {
				processValueForCounter(counterName, item, path);
			}
		}
		else if (value instanceof Tuple) {
			Tuple t = (Tuple) value;
			if (t.hasFieldName(key)) {
				result = t.getValue(key);
			}
		}
		else if (value instanceof Map) {
			result = ((Map<?, ?>) value).get(key);
		}
		if (result != null) {
			if (path.length == 1) {
				processValue(counterName, result);
			}
			else {
				path = Arrays.copyOfRange(path, 1, path.length);
				processValueForCounter(counterName, result, path);
			}
		}
	}

	protected void processValue(String counterName, Object value) {
		if ((value instanceof Collection) || ObjectUtils.isArray(value)) {
			Collection<?> c = (value instanceof Collection) ? (Collection<?>) value
					: Arrays.asList(ObjectUtils.toObjectArray(value));
			for (Object val : c) {
				fieldValueCounterWriter.increment(counterName, val.toString(), 1.0);
			}
		}
		else {
			fieldValueCounterWriter.increment(counterName, value.toString(), 1.0);
		}
		
		/*
		 * Print the data collected
		 */
		FieldValueCounter counter = fieldValueCounterReader.findOne("Gau");

		Map<String, Double> result = counter.getFieldValueCounts();
		/*
		count++;
		System.out.println("Passaggi:" + count);
		*/
		logger.info("");
		logger.info("************");
		result.forEach((k,v)->logger.info("Url : " + k + " Count : " + v));
		logger.info("************");
		logger.info("");
		

	}

	protected String computeMetricName(Message<?> message) {
		return fvcSinkProperties.getComputedNameExpression()
.getValue(message, CharSequence.class).toString();
	}


}

In the application file I look for the url properties of the ApacheLog pojo and I named the aggregator as “Access”.


#Name of the counter
field-value-counter.name=Access

#Field to look into the pojo
field-value-counter.field-name=url

And the result looks like this:

************
Url : /selfcare/extauth/loginPage.do?mn=1&azione=entraVL&service_id=myblog Count : 2.0
Url : /selfcare/ga/gauPanel.do?entryPoint=hpcommunity Count : 6.0
Url : /selfcare/images/gau/list_freccia.gif Count : 5.0
Url : /light/entra?email=mark.denis%40google.com&ca=407727 Count : 1.0
Url : /fol/verifyLogin.do?urlRitorno=http://mail.my.it/SSOLogin Count : 1.0
Url : /selfcare/images/extauth/rotorLayer.gif Count : 3.0
Url : /album/fe/wim/form\r Count : 6.0
Url : /robots.txt Count : 1.0
Url : /fol/verifyLogin.do?urlRitorno=http://my.community.it/ Count : 1.0
Url : /fol/verifyLogin.do?urlRitorno=http://mail.my.it/SSO/default.jsp Count : 20.0
************

So far, I’ve shown how to build and configure the service. Now it’s time to have a look at the binding part of message broker that I decided to rely on Apache Kafka.

Adding Apache Kafka, the initial picture changes in this way:

This is not the post where I’m going to describe the Kafka functions. I wrote an article about Kafka almost two years ago, you can find more information here: (https://techannotation.wordpress.com/2015/10/26/introduction-to-apache-kafka-using-spring/).

So, the message (log line) goes through the pipeline and it’s received and dispatched by Apache Kafka on service request.

All this process is configured in the application.properties files. First of all, the services must connect with the same Kafka instance, thus, every files contain the configuration

 

spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

Once configured the broker, each file has their bindings, and, for the Source service are:


spring.cloud.stream.bindings.input.group=foo
spring.cloud.stream.bindings.output.destination=fromfile

For the Process service are:


spring.cloud.stream.bindings.input.group=foo

spring.cloud.stream.bindings.input.destination=fromfile
spring.cloud.stream.bindings.output.destination=tosink

And, finally, for the Sink process are:


spring.cloud.stream.bindings.input.group=foo 

spring.cloud.stream.bindings.input.destination=tosink

As you can see, the binding’s group is always the same for each service. This makes available all the messages from the services with the same group’s name.

Pay attention at the configuration spring.cloud.stream.kafka.bindings.input.consumer.startOffset=earliest.

This indicates at Service consumer (by the Spring binding driver) to consume all the messages which still haven’t been consumed.

Now it’s time to deploy the solution and we’ve different options. We can decide to put it on Spring Cloud Data Flow server or adopting Spring Cloud Cloud Foundry platform for running the solution.

For my purpose, I decided to run the service as local application launched by command line. In a future article I’m going to describe what means deploy this solution on Data Flow server, but not this time.

The steps are:

  • Running Kafka Zookeeper Server;
  • Starting Kafka Broker;
  • Starting the Source, the Process and the Sink service (it doesn’t matter in which order).

Put an Apache log file into the file.directory specified above and you get your aggregation.

Source code at GitHub (https://github.com/MarcoGhise/LogProcessorCloudStream)

References

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s