Mule 2.2.1 – Custom Transport

In this post, I’d like to draw your attention creating a new transport connector for mule.

In one of my project I had to use a file connector fo read file system directory but only in a range of time.

I could have used a Quartz connector to implement this behaviour but I wanted to use a more custom control.

For this reason, I created a new custom transporter starting from original File Connector transporter.

Build new connector

As usual, you can find out more info at official Mule web site’s here.

My aims were:

  • Read file system directory only in that range of time
  • Limited the number of file in the process queue

Let’s start with the code.
From Mule source files I duplicated the “fileconnector” folder (\mule-standalone-2.2.1\src\org\mule\transport\file) and changed FileMessageReceiver that inherits from org.mule.transport.AbstractPollingMessageReceiver class.

At the class I added the fields:

//Max number of file's queue in directory
private long maxFilesNumber = 0;
//Process Directory
private String processDir = null;
//Start valid date range
private String startTime = null;
//Stop valid date range
private String endTime = null;

And then changed the poll() method

public void poll()
{
    try
    {
     if (!isRangeInclude())
      return;
     
     
        File[] files = this.listFiles();
        if (logger.isDebugEnabled())
        {
            logger.debug("Files: " + Arrays.toString(files));
        }
        Comparator comparator = getComparator();
        if (comparator != null)
        {
            Arrays.sort(files, comparator);
        }
        for (File file : files)
        {
            // don't process directories
            if (file.isFile())
            {
                this.processFile(file);
            }
        }
    }
    catch (Exception e)
    {
        this.handleException(e);
    }
}

At line 5 I added the method isRangeInclude to check if the execution time (the current time) is inside the range that I defined in the fields startTime and endTime.

The method definition is:

protected boolean isRangeInclude()
{
 DateFormat sdf = new SimpleDateFormat("hh:mm:ss");
      try {         
        
       ///Start Time
       Date dateStart = sdf.parse(startTime);           
          Calendar calStart =Calendar.getInstance();          
          calStart.setTime(dateStart);
         
          ///Stop Time
       Date dateEnd = sdf.parse(endTime);           
          Calendar calEnd =Calendar.getInstance();          
          calEnd.setTime(dateEnd);
         
          Calendar calendar = new GregorianCalendar();
          int todayTimeInt = calendar.get(Calendar.HOUR_OF_DAY) * 60 + calendar.get(Calendar.MINUTE);
          int startTimeInt = calStart.get(Calendar.HOUR_OF_DAY) * 60 + calStart.get(Calendar.MINUTE);
          int endTimeInt = calEnd.get(Calendar.HOUR_OF_DAY) * 60 + calEnd.get(Calendar.MINUTE) ;
    
          if ((todayTimeInt>startTimeInt)||(todayTimeInt<endTimeInt))
           return true;
          else
           return false;
         
      } catch (Exception e) {
       logger.error(e);
          return false;   
      }
}

For what concern the task “Limited the number of file in the process queue” we have to update the processFile method. It’s enought to check the number of files in folder and compare it with your limit. More easily:

...
if (fileCount<maxFilesNumber)
...

Now you have to set the property configuration in file.properties under the folder META-INF/services/org/mule/transport/

message.receiver=it.matrix.transport.matrixfile.MatrixFileMessageReceiver

After this, we have to add the above properties at the xsd file definition. You can find it under the META-INF folder and the template is inside the jar archive mule-transport-file-2.2.1.jar

<xsd:attributeGroup name="inboundAttributes">
        ...
        <xsd:attribute name="processDirectory" type="xsd:string">
            <xsd:annotation>
                <xsd:documentation>
                    Directory to process
                </xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>       
        <xsd:attribute name="maxFilesNumber" type="mule:substitutableLong">
            <xsd:annotation>
                <xsd:documentation>
                    Max numbers of files in directory
                </xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>       
        <xsd:attribute name="startTime" type="xsd:string">
            <xsd:annotation>
                <xsd:documentation>
                    Start Time
                </xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>       
        <xsd:attribute name="endTime" type="xsd:string">
            <xsd:annotation>
                <xsd:documentation>
                    End Time
                </xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
        ...
    </xsd:attributeGroup>

The last steps that remain to do are build the jar of new connector and move it under the lib/user folder of our Mule installation.

Use new connector

For use a new connector we have to add, at new flow, the new connector definition by adding the schema definition.

After this we can use it as normal connector with a new prefix. Below the result code:

<?xml version="1.0" encoding="UTF-8"?>
<mule
       ...
       xmlns:matrixfile="http://www.mulesource.org/schema/mule/matrixfile/2.2"
       ...
    xsi:schemaLocation="
       ...
       http://www.mulesource.org/schema/mule/matrixfile/2.2 http://www.mulesource.org/schema/mule/matrixfile/2.2/mule-matrixfile.xsd
       ...">

  <matrixfile:connector name="MatrixFileConnectorInbound"
    pollingFrequency="30000" streaming="true" >
      <matrixfile:expression-filename-parser />
  </matrixfile:connector>

	<model>

		<service name="Matrix.File">
			<inbound>
			 	<matrixfile:inbound-endpoint connector-ref="MatrixFileConnectorInbound"
					path="/in" moveToDirectory="/backup" fileAge="300000"
					processDirectory="/process"
					maxFilesNumber="5"
					startTime="23:30:00"
					endTime="18:45:00" />
			</inbound>
			<echo-component/>
			<outbound>
				<multicasting-router>
        ...
				</multicasting-router>
			</outbound>
		</service>
  </model>
</mule>

That’s all. My process will run between 23:30 and 18:45 and my queue will have limit of 5 files in folder.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s