Wednesday, March 24, 2010

Adding attachments to an existing email message

I thought that this would be a reasonably common use case but I couldn't find the information I needed on the Internet and I wasted quite few hours scratching my... ar.. head...

[Service Layer] Alfresco + HornetQ + Mule (+ Hyperic Agent)
Mule Service1
I use Mule 2.2.1 to read emails from a pop account. Simple and straight forward using the pop3 transport.

The emails are actually forwarded as attachments to a parent email (could be more than one email as an attachment to the email) so my service splits the emails using a splitMessage in a router. It then places the individual messages on a HornetQ (jms queue1)

Mule Service2
Mule then consumes from (jms queue1) and completes a transformation of any attachments. I read the email, and iterate the attachments, transform them to PDF (Open office and iText), add the pdfs as attachments to the email, and finally place the email on HornetQ (jms queue2)

Mule Service3
Mule consumes from (jms queue2), saves messages into an alfresco repository using web services, creates a serialised transfer object (mytransfer.class) containing the alfresco node reference to the folder holding the attachments, and which places the transfer object on HornetQ (jms queue3).

I could do all this as one service however it is split into separate transactions so that we get the benefit of load balancing. The service layer is actually more than one VM and the HornetQ is clustered over those instances so any instance (the one with free capacity) consumes from the queue. Neat huh!?

[Portal Layer] Liferay Portal + Mule + JBPM
Mule Service4
In our portal we embed Mule which listens to (jms queue3). It consumes the transfer object and uses that information to link to the alfresco folder and start a JBPM workflow process. (Load balanced between the pooled portal instances)

Now the point to this blog is... gotcha at Mule Service2 (adding new attachments to an existing multipart message).

When you add attachments to an existing email (mimeMessage) you must call saveChanges() on it so that the headers are updated to reflect the changed content.

Tuesday, March 16, 2010

Mule and HornetQ

HornetQ JMS comes with a lot of examples but they all run on 'localhost' and my environment is distributed. I wanted to connect things together in an indirect / loosely coupled way. It took me a while to find a solution to this as I couldn't find any suitable information on the Internet. I didn't want to use JNDI as I knew that HornetQ broadcasts on UDP within the network. I firmly believed that "there must be a way to find that broadcast and get the connector from HornetQ in a loosely coupled way" (indirection). I eventually found the (a) solution...

The Mule configuration file is shown here. You will notice that the connection Factory injected properties contain discoveryAddress, discoveryPort and discoveryInitialWaitTimeout. The discoveryAddress and discoveryPort are the same address and port that is configured in your hornetq-configuration.xml so that it listens for the broadcast. The discoveryInitialWaitTimeout isn't quite so intuitive. In your hornetq-configuration.xml you will see that the default broadcast-period is 5000 (milliseconds). I figured that if you double that then you are guaranteed to catch a broadcast in 10000 milliseconds. If you miss the broadcast the connection will fail (which makes sense to me)

 <?xml version="1.0" encoding="UTF-8"?> 
<mule xmlns="http://www.mulesource.org/schema/mule/core/2.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:stdio="http://www.mulesource.org/schema/mule/stdio/2.2"
xmlns:vm="http://www.mulesource.org/schema/mule/vm/2.2"
xmlns:jms="http://www.mulesource.org/schema/mule/jms/2.2"
xmlns:quartz="http://www.mulesource.org/schema/mule/quartz/2.2"
xmlns:cxf="http://www.mulesource.org/schema/mule/cxf/2.2"
xmlns:cxf-core="http://cxf.apache.org/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.mulesource.org/schema/mule/core/2.2 http://www.mulesource.org/schema/mule/core/2.2/mule.xsd
http://www.mulesource.org/schema/mule/stdio/2.2 http://www.mulesource.org/schema/mule/stdio/2.2/mule-stdio.xsd
http://www.mulesource.org/schema/mule/vm/2.2 http://www.mulesource.org/schema/mule/vm/2.2/mule-vm.xsd
http://www.mulesource.org/schema/mule/jms/2.2 http://www.mulesource.org/schema/mule/jms/2.2/mule-jms.xsd
http://www.mulesource.org/schema/mule/quartz/2.2 http://www.mulesource.org/schema/mule/quartz/2.2/mule-quartz.xsd
http://www.mulesource.org/schema/mule/cxf/2.2 http://www.mulesource.org/schema/mule/cxf/2.2/mule-cxf.xsd
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd">
<spring:beans>
<spring:bean name="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
<spring:constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory">
<spring:constructor-arg ref="transportConfiguration"/>
<spring:property name="minLargeMessageSize" value="250000"/>
<spring:property name="cacheLargeMessagesClient" value="false"/>
<spring:property name="discoveryAddress" value="231.7.7.7"/>
<spring:property name="discoveryPort" value="9876"/>
<spring:property name="discoveryInitialWaitTimeout" value="10000"/>
</spring:bean>
<spring:bean id="foreverRetryPolicyTemplate" class="org.mule.modules.common.retry.policies.ForeverRetryPolicyTemplate" />
<spring:bean id="ThreadingPolicyTemplate" class="org.mule.modules.common.retry.policies.AdaptiveRetryPolicyTemplateWrapper">
<spring:property name="delegate" ref="foreverRetryPolicyTemplate" />
</spring:bean>
</spring:beans>
<jms:connector name="jmsConnector"
connectionFactory-ref="connectionFactory"
createMultipleTransactedReceivers="false"
numberOfConcurrentTransactedReceivers="1"
specification="1.1"
username="myusername"
password="mypassword">
<spring:property name="retryPolicyTemplate" ref="ThreadingPolicyTemplate" />
</jms:connector>
<vm:connector name="localvm"/>
<jms:object-to-jmsmessage-transformer name="ObjectToJMSMessageTransformer"/>
<jms:jmsmessage-to-object-transformer name="JMSMessageToObjectTransformer"/>
<model name="importApplicationModel">
<service name="importApplicationService">
<inbound>
<jms:inbound-endpoint queue="va.import.3">
<jms:transaction action="ALWAYS_BEGIN"/>
</jms:inbound-endpoint>
</inbound>
<component>
<spring-object bean="importApplicationConsumer" />
</component>
</service>
</model>
<model name="jmsTestModel">
<service name="jmsTestService">
<inbound>
<vm:inbound-endpoint address="vm://jmsTest" connector-ref="localvm"/>
</inbound>
<outbound>
<pass-through-router>
<jms:outbound-endpoint queue="va.testloop.1"/>
</pass-through-router>
</outbound>
</service>
</model>
</mule>


I also needed to consume from one of my queues in my portal code where mule was not available and that is very similar:

 Queue queue = HornetQJMSClient.createQueue("va.testloop.2");  
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory hqcf = new HornetQConnectionFactory(transportConfiguration);
hqcf.setDiscoveryPort(9876);
hqcf.setDiscoveryAddress("231.7.7.7");
hqcf.setDiscoveryInitialWaitTimeout(10000);
connection = hqcf.createConnection("myusername","mypassword");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
HornetQTextMessage thing = (HornetQTextMessage)messageConsumer.receive(5000);


This has to be within a try/catch block. You need to follow the HornetQ User Manual regarding client librarys on your classpath and the necessary imports are:

import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.integration.transports.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQTextMessage;

You will also note that the JMS queues are protected by username and password but that is just standard setup for HornetQ so refer to the HornetQ User Manual for instructions. Hope this snippet helps.