Performance benchmark for Spring Integration

I was looking for any official performance benchmark for Spring Integration framework,

however was unable to find.

I decided to create my own in order to better understand the framework’s performance ability.

I measured my test on a physical server that has 8 CPU’S.

The producers and the consumers are in the same network on different JVM’SMachines.

The application included two Direct (Queue’s) channels:

Producer1 – > Queue1 -> Consumer1 -> Producder2-> Queue2 -> Consumer2

The message is 6 characters String converted to a ByteArray.

The message measurement starting point at Producder1

For executing 3 million messages:

1. Throughput: ~10,000 m/s

2. Latency:     ~0.1244 ms

Pretty good results compared to the current MQ’s market,


Related Articles:


Aggregating async results using Spring Integration


I came across an issue which has very nice solution using Spring Integration.

Many times we have the need for a scenario of dispatching a message to unknown number of destinations.
For this purpose we have the Topic methodology.

But some times in addition we also want to receive answers from all destinations which received the message and aggregate them to to a single result answer.

For this purpose we can use channels combined with Aggregator and ReleaseStrategy interfaces.

In this post I wont concentrate on the “channels implementation”

So let’s say we have a producer who sends it’s message to a Topic.

Now we have a consumer which receives that message.

Using gateway and a processor interface we can send that message in any type we want:

public interface Processor
    public void sendResponse(String response);

Consumer code:

@Override public void onMessage(Message message)
  String resultMessage = "";
    catch (Exception e)
         log.error("Error while processing message in channel consumer. errorMsg=" + e.getMessage(), e);

Now the message will be delivered to a channel(“In Channel”) We can add to this message an extra information(in case we have different message groups). After adding an extra information we dispatch that message to another channel (“Out channel”) Now here is the magic: We create two pojo’s that later will be bind to interfaces using the XML configuration. ReleaseStrategy:

public class ReleaseStrategy
	public boolean canRelease(List results)
            // check if all 5  subscribers sent responses
		if (results.size() == 5)

			return true;
		return false;


public class Aggregator
	public String aggregate(List results)
               String finalResult= "SUCCESS_RESULT";
		for (String result: results) {

			if (result.equals("ERROR_RESULT")) {
				finalResult= "ERROR_RESULT";

		return finalResult;

Basically what happens here is that after we return a “true” value via the canRelease method of ReleaseStrategy Interface the Aggregator will be able to receive the aggregated message and dispatch single result to the final destination(could be a Queue where another message consumer will get and process the result) Xml configuration:

< ?xml version="1.0" encoding="UTF-8"?>


Related Articles:


Accessing Jboss JMX programmatically


The next example will explain how to retrieve attributes from the JMX console via java.

The reason why you want to access the JMX console would be the need of having values which you wont be able to retrieve via the common API.

For example if we want to get the current subscribers of a specific Topic/Queue resource for any purpose
we wont be able to get it via the common JMS API.

We need to access to the JMX(locally or remotely) and read the attribute we desire.

Some code:

An example of connecting to the JMX and accessing the current number of subscribers which listening to a Topic:

public static int getCurrentTopicSubscribers()

		//use this code when you are connecting remotely(outside the jboss container).
//			Hashtable ht = new Hashtable();
//			ht.put(Context.INITIAL_CONTEXT_FACTORY,
//					"org.jnp.interfaces.NamingContextFactory");
//			ht.put(Context.PROVIDER_URL, "localhost:1099");
//			ht.put(Context.SECURITY_PRINCIPAL, "admin");
//			ht.put(Context.SECURITY_CREDENTIALS, "admin");
//			System.out.println("nt 1- Gotting InitialContext...... ");
//			Context ctx = new InitialContext(ht);

                       //(Connection within the jboss container)
		 MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).get(0);
			server = (MBeanServerConnection) ctx.lookup("jmx/invoker/RMIAdaptor");
			if (server != null)
				log.error("Couldnt connect to JMX server");
		catch (Exception e)
			log.error("Couldnt connect to JMX server");

After we have connection we can easily retrieve the attributes from the JMX:

public static void monitorJMS() throws Exception
		ObjectName objectName = new ObjectName(
		System.out.println("DurableMessageCount = "
				+ (Integer) server.getAttribute(objectName, new String("DurableMessageCount")));
		System.out.println("AllSubscriptionsCount = "
				+ (Integer) server
						.getAttribute(objectName, new String("AllSubscriptionsCount")));


Related Articles:


Using JMS in Tomcat

Hi all,
Recently I had to use JMS topic in my application. problem was I was working with Tomcat web application.
Tomcat doesnt ship with JMS capabitilites but you can configure and use external JMS and plug it into Tomcat.

There are couple of JMS vendors. I chose to use ActiveMQ.

So first you need to add the following jars to your tomcat lib dir:


Next, add the resource below to conf/server.xml:

next, add this code to context.xml:

Next, We need to add ActiveMQ props into tomcat class path:

-Dwebconsole.jmx.url=service:jmx:rmi:///jndi/rmi://localhost:1099/ jmxrmi

In order to start and init our JMS queue we need to add the following code to some class and start it as service so it will get started on server startup:

BrokerService broker = new BrokerService();
// configure the broker

//use broker.stop() to stop the service.

Now let’s create small application to demonstrate some Topic movemonet:


InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup(“java:comp/env”);
ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup(“jms/ConnectionFactory”);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(“jms/topic/MyTopic”);
MessageProducer producer = session.createProducer(destination);
TextMessage msg=session.createTextMessage();
msg.setText(“Message sent”);

(Now We create some Client in order to retrieve any message which will be dispatch onto our JMS topic)

public class Reciever {

protected Topic queue;

protected String queueName = “jms/topic/MyTopic”;

protected String url = “tcp://localhost:61616″;

protected int ackMode = Session.AUTO_ACKNOWLEDGE;

public static void main(String[] args) {
Reciever rec=new Reciever();
try {;
} catch (Exception e) {


public void run() throws JMSException{

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
TopicConnection connection = (TopicConnection)connectionFactory.createTopicConnection();

MessageConsumer consumer = null;
Session session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createTopic(queueName);
consumer = session.createConsumer(queue);

System.out.println(” Waiting for message (max 5) “);

for (int i = 0; i < 5; i++) {
Message message = consumer.receive();


System.out.println(“Closing connection”);

public void processMessage(Message message) {

try {

TextMessage txtMsg = (TextMessage) message;

System.out.println(“Received a message: ” + txtMsg.getText());

} catch (Exception e) {




Related Articles: