AmazonSQS and Spring for messaging queue

The next post will demonstrate how to use Spring JMS templates and DLMC’S together with AmazonSQS API in order to place message queue.

Why would I use Amazon SQS?

1.  Easy to configure

2. Cross-platfom support

3. Earn from your self redundant, conjunction and scaling worries.

Why I wouldn’t use Amazon SQS?

1. If the latency requirement demands less than  ~20 MS

2. Costs ~0.00005$ per message

I came across a nice open source project called: Nevado which wrapping the Amazon SQS API in a very neat way.

Add this to your maven dependency:

<dependency>
 <groupId>org.skyscreamer</groupId>
 <artifactId>nevado-jms</artifactId>
 <version>1.2.4</version>
 </dependency>

Now let’s configure Spring beans to integrate nicely with AmazonSQS:

1. Connection factory:

<bean id="sqsConnectorFactory" class="org.skyscreamer.nevado.jms.connector.amazonaws.AmazonAwsSQSConnectorFactory"/>

Pay attention that within this wrapper we need to set aws.accessKey and aws.secretKey. We get those keys from AmazonSQS account portal:

<bean id="connectionFactory" class="org.skyscreamer.nevado.jms.NevadoConnectionFactory">
 <property name="sqsConnectorFactory" ref="sqsConnectorFactory"/>
 <property name="awsAccessKey" value="${aws.accessKey}"/>
 <property name="awsSecretKey" value="${aws.secretKey}"/>

</bean>

2. Create the queue

<bean id="myQueue" class="org.skyscreamer.nevado.jms.destination.NevadoQueue">
 <constructor-arg value="${aws.sqs.queue.name}"/>
 </bean>

3. Create Jms template (which will be injected later in the code to send messages):

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
 <property name="defaultDestinationName" value="${aws.sqs.queue.name}"/>
 <property name="connectionFactory" ref="cachedConnectionFactory"/>
 </bean>

3. Add listeners:

3.a I am using SimpleMessageListenerContainer which has the ability to cache connections, run concurrent consumers, set error listeners and more.

<bean id="simpleMessageListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
 <property name="connectionFactory" ref="connectionFactory"/>
 <property name="messageListener" ref="listener"/>
 <property name="destination" ref="myQueue"/>
 <property name="errorHandler" ref="amazonMessageListener"/>
 <property name="concurrency" value="20"/>
 <property name="taskExecutor" ref="listenerThreadPoolTaskExecutor"/>
 </bean>

3.b Let’s add Thread pool for the listener’s executors:

<bean id="listenerThreadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
 <property name="corePoolSize" value="70"/>
 <property name="maxPoolSize" value="70"/>
 <property name="daemon" value="true"/>
 <property name="keepAliveSeconds" value="60"/>
 </bean>

3.c Add Caching connections support:

<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
 <property name="targetConnectionFactory" ref="connectionFactory"/>
 <property name="sessionCacheSize" value="10"/>
 </bean>

3.d Create MessageAdapter to hook everything up and to set pojo as our message listener(amazonMessageListener):

<bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
 <property name="delegate" ref="amazonMessageListener"/>
 <property name="defaultListenerMethod" value="onMessage"/>
 <property name="defaultResponseDestination" ref="myQueue"/>
 </bean>

Idan.

Related Articles:

(5373)

JMX support guide for your Spring-Integration application

Hi,

Using JMX in the Enterprise world is very essential for monitoring and alerting our application.

Enable JMX in our Spring-Integration framework is pretty easy.

With JMX support we can get reports on our messages delivery, their current status, control our channels and endpoint modules.

So how we do that?

1. Adjust the Namespace in the Spring Integration xml:

xmlns:int-jmx="http://www.springframework.org/schema/integration/jmx"
http://www.springframework.org/schema/integration/jmx
 http://www.springframework.org/schema/integration/jmx/spring-integration-jmx-2.1.xsd

2. Add the following tags for enabling mbeanServer ( that server will be able to export all our annotated beans including the Spring-Integration ones):

<context:mbean-server />

3. We need html adaptor in order to see all mbeans on our browser:

<bean name="adaptor:name=htmlAdaptor" class="com.sun.jdmk.comm.HtmlAdaptorServer"
 init-method="start" />

4. Enabling export of Spring-Integration beans to JMX:

<int-jmx:mbean-export id="integrationMbeanExporter"
 default-domain="spring.application" />

<context:mbean-export default-domain="spring.application" />

That will do the work,

Idan.

Related Articles:

(5373)

Spring integrating with remote Topic

Hi,
I am going to demonstrate how to receive messages from a remote topic.

The remote Topic is placed in a Jboss application server.

1. Add topic implementation jars into spring lib class-path

2. We going to create jmsTemplate makes the java code pretty minimal and simple:

public class TgwMDB implements MessageListener
{

	@Override
	public void onMessage(Message msg)
	{
	}
}

3. Most of the configuration is at the applicationContextXml:

3.1 Declare jmsTemplate:

 	jnp://server.ip:1099
	org.jnp.interfaces.NamingContextFactory
	org.jboss.naming:org.jnp.interfaces

3.2 declare jndiTopicConnFactory.

	

3.3 declare ConnectionFactory:


3.4. declare DestinationResolver:

3.5 declare DefaultMessageListenerContainer which will act as a cached message listener

		 // this property pubSubDomain turns the listener from Queue to Topic.

 


You ready now to receive messages.

Idan.

[ad]

Related Articles:

(5373)

Accessing Jboss JMX programmatically

Hi,

The next example will explain how to retrieve attributes from the JMX console via java.

The reason why you want to access the JMX console would be the need of having values which you wont be able to retrieve via the common API.

For example if we want to get the current subscribers of a specific Topic/Queue resource for any purpose
we wont be able to get it via the common JMS API.

We need to access to the JMX(locally or remotely) and read the attribute we desire.

Some code:

An example of connecting to the JMX and accessing the current number of subscribers which listening to a Topic:


public static int getCurrentTopicSubscribers()
	{

		try
		{
		//use this code when you are connecting remotely(outside the jboss container).
			//------------------------------------
//			Hashtable ht = new Hashtable();
//			ht.put(Context.INITIAL_CONTEXT_FACTORY,
//					"org.jnp.interfaces.NamingContextFactory");
//			ht.put(Context.PROVIDER_URL, "localhost:1099");
//			ht.put(Context.SECURITY_PRINCIPAL, "admin");
//			ht.put(Context.SECURITY_CREDENTIALS, "admin");
//			System.out.println("nt 1- Gotting InitialContext...... ");
//			Context ctx = new InitialContext(ht);
			//---------------------------------

                       //(Connection within the jboss container)
		 MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).get(0);
			server = (MBeanServerConnection) ctx.lookup("jmx/invoker/RMIAdaptor");
			if (server != null)
			{
				monitorJMS();
			}
			else
			{
				log.error("Couldnt connect to JMX server");
			}
		}
		catch (Exception e)
		{
			log.error("Couldnt connect to JMX server");
		}

After we have connection we can easily retrieve the attributes from the JMX:

public static void monitorJMS() throws Exception
	{
		ObjectName objectName = new ObjectName(
				"jboss.messaging.destination:name=DispatcherTopic,service=Topic");
		System.out.println("DurableMessageCount = "
				+ (Integer) server.getAttribute(objectName, new String("DurableMessageCount")));
		System.out.println("AllSubscriptionsCount = "
				+ (Integer) server
						.getAttribute(objectName, new String("AllSubscriptionsCount")));
	}

Idan.
[ad]

Related Articles:

(5373)