Tuesday, March 16, 2010

Mule and HornetQ

HornetQ JMS comes with a lot of examples but they all run on 'localhost' and my environment is distributed. I wanted to connect things together in an indirect / loosely coupled way. It took me a while to find a solution to this as I couldn't find any suitable information on the Internet. I didn't want to use JNDI as I knew that HornetQ broadcasts on UDP within the network. I firmly believed that "there must be a way to find that broadcast and get the connector from HornetQ in a loosely coupled way" (indirection). I eventually found the (a) solution...

The Mule configuration file is shown here. You will notice that the connection Factory injected properties contain discoveryAddress, discoveryPort and discoveryInitialWaitTimeout. The discoveryAddress and discoveryPort are the same address and port that is configured in your hornetq-configuration.xml so that it listens for the broadcast. The discoveryInitialWaitTimeout isn't quite so intuitive. In your hornetq-configuration.xml you will see that the default broadcast-period is 5000 (milliseconds). I figured that if you double that then you are guaranteed to catch a broadcast in 10000 milliseconds. If you miss the broadcast the connection will fail (which makes sense to me)

 <?xml version="1.0" encoding="UTF-8"?> 
<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:stdio="http://www.mulesource.org/schema/mule/stdio/2.2"
xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2"
xmlns:jms="http://www.mulesource.org/schema/mule/jms/2.2"
xmlns:quartz="http://www.mulesource.org/schema/mule/quartz/2.2"
xmlns:cxf="http://www.mulesource.org/schema/mule/cxf/2.2"
xmlns:cxf-core="http://cxf.apache.org/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
http://www.mulesource.org/schema/mule/stdio/2.2 http://www.mulesource.org/schema/mule/stdio/2.2/mule-stdio.xsd
http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd
http://www.mulesource.org/schema/mule/jms/2.2 http://www.mulesource.org/schema/mule/jms/2.2/mule-jms.xsd
http://www.mulesource.org/schema/mule/quartz/2.2 http://www.mulesource.org/schema/mule/quartz/2.2/mule-quartz.xsd
http://www.mulesource.org/schema/mule/cxf/2.2 http://www.mulesource.org/schema/mule/cxf/2.2/mule-cxf.xsd
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd">
<spring:beans>
<spring:bean name="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
<spring:constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory">
<spring:constructor-arg ref="transportConfiguration"/>
<spring:property name="minLargeMessageSize" value="250000"/>
<spring:property name="cacheLargeMessagesClient" value="false"/>
<spring:property name="discoveryAddress" value="231.7.7.7"/>
<spring:property name="discoveryPort" value="9876"/>
<spring:property name="discoveryInitialWaitTimeout" value="10000"/>
</spring:bean>
<spring:bean id="foreverRetryPolicyTemplate" class="org.mule.modules.common.retry.policies.ForeverRetryPolicyTemplate" />
<spring:bean id="ThreadingPolicyTemplate" class="org.mule.modules.common.retry.policies.AdaptiveRetryPolicyTemplateWrapper">
<spring:property name="delegate" ref="foreverRetryPolicyTemplate" />
</spring:bean>
</spring:beans>
<jms:connector name="jmsConnector"
connectionFactory-ref="connectionFactory"
createMultipleTransactedReceivers="false"
numberOfConcurrentTransactedReceivers="1"
specification="1.1"
username="myusername"
password="mypassword">
<spring:property name="retryPolicyTemplate" ref="ThreadingPolicyTemplate" />
</jms:connector>
<vm:connector name="localvm"/>
<jms:object-to-jmsmessage-transformer name="ObjectToJMSMessageTransformer"/>
<jms:jmsmessage-to-object-transformer name="JMSMessageToObjectTransformer"/>
<model name="importApplicationModel">
<service name="importApplicationService">
<inbound>
<jms:inbound-endpoint queue="va.import.3">
<jms:transaction action="ALWAYS_BEGIN"/>
</jms:inbound-endpoint>
</inbound>
<component>
<spring-object bean="importApplicationConsumer" />
</component>
</service>
</model>
<model name="jmsTestModel">
<service name="jmsTestService">
<inbound>
<vm:inbound-endpoint address="vm://jmsTest" connector-ref="localvm"/>
</inbound>
<outbound>
<pass-through-router>
<jms:outbound-endpoint queue="va.testloop.1"/>
</pass-through-router>
</outbound>
</service>
</model>
</mule>


I also needed to consume from one of my queues in my portal code where mule was not available and that is very similar:

 Queue queue = HornetQJMSClient.createQueue("va.testloop.2");  
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory hqcf = new HornetQConnectionFactory(transportConfiguration);
hqcf.setDiscoveryPort(9876);
hqcf.setDiscoveryAddress("231.7.7.7");
hqcf.setDiscoveryInitialWaitTimeout(10000);
connection = hqcf.createConnection("myusername","mypassword");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
HornetQTextMessage thing = (HornetQTextMessage)messageConsumer.receive(5000);


This has to be within a try/catch block. You need to follow the HornetQ User Manual regarding client librarys on your classpath and the necessary imports are:

import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.integration.transports.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQTextMessage;

You will also note that the JMS queues are protected by username and password but that is just standard setup for HornetQ so refer to the HornetQ User Manual for instructions. Hope this snippet helps.

No comments:

Post a Comment