Showing posts with label JMS. Show all posts
Showing posts with label JMS. Show all posts

Tuesday, July 15, 2008

Creating a JMS Bridge with Spring Integration

In my last post, I discussed replacing Mule with Spring integration. While showing that to my colleague Rod Biresch, he thought it would be interesting to use Spring Integration as a JMS Bridge. He ran into an issue last year (described in his post OpenESB and Glassfish: JMS Messaging) where he needed to replace Glassfish's Messaging Broker with a remote instance of ActiveMQ. From there, he was going to use an MDB to pull the message off the queue, and then write JMS code to place the same message on another broker's (non-ActiveMQ) queue. At the time, that was certainly an acceptable way to accomplish that. Another option would have been to use Mule to bridge the queues. Today several more options exist, including Spring Integration and Camel (among others). The nice thing about these is that they integrate nicely with Spring and are very lightweight. In this post, I will show how to use Spring Integration as a JMS Bridge between ActiveMQ and JBossMQ. I am using Spring Integration 1.0M5, ActiveMQ 5.1 (standalone), and JBoss 4.2.2 GA Server. Of course, this concept can be exteneded to bridge any 2 JMS providers.

In the Spring configuration file, we need to specify a listener (JMS Gateway) to receive the message off the ActiveMQ queue and place it on a channel, a channel adapter to take the messages from the channel and send them to a target endpoint, and a target endpoint that places the messages on the JBoss queue. And, of course a message bus to coordinate everything.

Below is the Spring Integration related configuration as previously described:

<!-- define the message bus -->
<integration:message-bus/>

<!-- necessary channels -->
<integration:channel id="bridgeChannel"/>

<!-- JMS Source that will receive messages -->
<integration:jms-gateway request-channel="bridgeChannel" connection-factory="jmsFactory" destination-name="alert.inbound.notification.queue" expect-reply="false"/>

<!-- define the channel adapter -->
<integration:channel-adapter channel="bridgeChannel" target="jmsTarget"/>

<integration:jms-target id="jmsTarget" connection-factory="jmsJBossConnFactory" destination="jBossQueue"/>

Of course along with this we need the required connection factories defined, as shown here:

<!-- JMS connection factory for ActiveMQ -->
<bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>

<!-- Connection information for JBoss Queue -->
<jee:jndi-lookup jndi-name="ConnectionFactory" id="jmsJBossConnFactory" environment-ref="jndiProps" expected-type="javax.jms.ConnectionFactory"/>

<jee:jndi-lookup jndi-name="queue/A" id="jBossQueue" environment-ref="jndiProps" expected-type="javax.jms.Queue"/>

<util:properties id="jndiProps" location="classpath:jndi.properties"/>

We also need a way to start up the Spring Context. If you are writing a web application, this would happen through configuration. In my circumstances, I was writing a standalone application for testing purposes, so I wrote a simple java class to load the Spring Context:

public static void main( String[] args ) {
AbstractApplicationContext context =
new ClassPathXmlApplicationContext("spring-integration-config.xml");
}
I used the web console included with the standalone ActiveMQ distribution along with the Administration Console for JBoss to validate everything worked. You could just as easily use Hermes or something like that too.

Important safety tip: The ActiveMQ web console doesn't always work as it appears. When trying to place a message on the queue, do not use the "Send" link within the header area of the page. Use the "Send To" link in the operations column of the queue listing page. I wasted a little time troubleshooting what I thought was a problem :)

If you are using Maven, here is the pom I used for this application:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.chariotsolutions.jms.adapter</groupId>
<artifactId>JMSAdapter</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>JMSAdapter</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>com.springsource.repository.bundles.milestone</id>
<url>http://repository.springsource.com/maven/bundles/milestone</url>
</repository>
<repository>
<id>com.springsource.repository.bundles.release</id>
<url>http://repository.springsource.com/maven/bundles/release</url>
</repository>
<repository>
<id>com.springsource.repository.bundles.external</id>
<url>http://repository.springsource.com/maven/bundles/external</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>org.springframework.integration</artifactId>
<version>1.0.0.M5</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>org.springframework.integration.adapter</artifactId>
<version>1.0.0.M5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>jboss</groupId>
<artifactId>jbossmq-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>jboss</groupId>
<artifactId>jnp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>jboss</groupId>
<artifactId>jboss-common</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.13</version>
</dependency>
<dependency>
<groupId>concurrent</groupId>
<artifactId>concurrent</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>


And it was as simple as that. I spent more time getting the Maven dependencies figured out than anything else.

Thursday, May 17, 2007

BPEL Correlations...as easy as it sounds?

In my previous post I discussed the basics of BPEL correlations. In this post, I want to present the challenge I faced trying to apply correlations to my business process, which unfortunately for me does not align with any vendor tutorials I have seen. Imagine that :)

The portion of my business process which requires correlation involves calling out to a notification web service and supplying an unbounded set of people to which I want to notify. This notification web service then responds with a single notification identifier (notification ID) representing the notification sent. At some future point in time, the people notified will respond (or the notification system will let me know there was no response). These responses can be received in any order at any time, and are received via a web service call into my ESB. The payload of this web service call includes the notification ID and the response of the person. After receiving the response, my business process updates the person information with the response and continues on. This is exactly what correlations are made for. Perfect!

Challenge 1: If I define correlation initialization on the notification web service invocation activity, this will persist 1 instance of the business process. But I have multiple responses coming back. After the first response is received, I have no business process instances waiting.

Solution 1a: I'll just loop around the web service invocation for each person in my set of people and make individual calls out to the notification web service. This will create multiple notification IDs and therefore persist multiple business process instances. So as responses come back, each one will correspond to its own instance. The business process would look something like this:

Problems: First, the business process will always run to completion as the response web service implementation gets invoked after the while loop (i.e. there is no receive task for the process to wait on). So, if we replace the web service implementation with a JMS receive task we can get over that hurdle. All we have to do is create another business process that implements a web service, and within the implementation it uses a JMS send activity to send the notification response to the corresponding JMS queue that this process is monitoring. The new business processes look like this:



Now the problem is that we really only have one business process instance being created. Although it seems like multiple business process instances are being persisted, each one really maps to the same instance. So, once the 1st response is received, the persisted instance is gone and other responses are left hanging.

Solution 1b: So, we remove the loop here and will need to create another business process (we are now up to 3 business processes) that sends messages with a single person to the inbound JMS queue. This will cause a new business process instance to be created for each person, so each response can now be correlated to one of these instances.

Challenge 2: How do we define the correlation set so that it actually works? Our correlation aliases were initially defined as the notification ID returning from the notification web service invocation and the JMS receive message text. Since the message text value (an XML message) does not match the notification ID (an int data type), the correlation is not working.

Solution: This was a difficult one. I went through many iterations with problems....err challenges encountered with each one. Here they are:

(1) Add an unmarshal task so that we can correlate on the resulting notification ID value. This will require using correlation on the Unmarshal activity. Since the JMS receive activity, which occurs prior to the unmarshal, causes the BPEL engine to retrieve an instance from persistence, and no correlation set is defined for this task, the engine grabs any instance. So essentially there is no real correlation going on.

(2) Try using the JMS message header correlation ID. This involved mapping the response notification ID to the correlation ID of the JMS message header prior to placing this on the response queue (in our second business process). The issue here was the correlation ID message property is defined as a string, while the notification ID is an int. So CAPS will not let you define an appropriate correlation key/correlation set that will work. The data types of each alias need to be the same, which makes sense. Also, since my business process has 2 JMS receive tasks, the BPEL engine gets confused when trying to create the instance identifier using the correlation ID of the JMS message header.

(3) Call for help. I discussed this problem with someone I know from Sun and he was able to provide a pre-release of some documentation that provided a good amount of insight into how to deal with correlations. The result of this new found knowledge was the creation of one more business process (for a total of 4) that actually had 2 JMS receive tasks. I could then create a correlation set based on the JMS message header correlationID and use correlations on each of the JMS receive tasks. The following images show 3 of the 4 business processes (the one not shown is simply a loop that places messages on the inbound JMS queue for the notification web service invocation process).

Notification Web Service Invocation
Notification Response Web Service Implementation

Combined Business Process Using Correlation


First, a message is placed on the inbound queue of the notification web service invocation process. This calls the external web service and uses the response to place a message on the correlation.queue JMS queue. Also, the notification ID that is the response from the web service invocation is set to the JMS message header correlationID property.

Recall that the Combined Business Process has a correlation set defined to be the correlationID message header property. The 2 JMS receive activities are set to use correlations. The first receive task initializes it and the second uses it.

So, once the first message is received, an instance of the Combine Business Process is created and then persisted (since it is now waiting on the second JMS receive). Now, at some future point in time, a person responds to the notification (or the service responds with "no response"). The response includes the notification ID and the response. The Notification Response Web Service Implementation maps the notification ID to the correlationID message property and places the response on the response.queue.

The second JMS receive task on the Combined Business Process is using this queue. This causes the BPEL engine to retrieve the business process instance that corresponds to the notification ID contained within the persons response. The process instance is retrieved (along with its state), and the business process continues to completion. This implementation of correlation finally worked.

From initial design to final implementation took longer (and was more painful) than it sounds. As a result I got a great understanding of how correlation works within CAPS and I am confident that this knowledge will transfer to other products as well.

Wednesday, May 16, 2007

BPEL Correlations introduced using Sun CAPS

It has been a while since my last post, so let me start with a quick update on my real world SOA journey. As I had mentioned previously, I was able to get some Sun Java CAPS training and was temporarily putting Cape Clear on the back burner. Well, I have been working with CAPS now for about a month and I find it very interesting. Fortunately, some of the issues I ran into with Cape Clear are no different on CAPS. In reality, they are more SOA issues than product issues. For example, invoking a web service that uses SOAP encoding (as discussed here). That being said, I could reuse much of the experience I gained with Cape Clear during my CAPS implementation.

Enough history (you can read my previous posts to catch up), this post is supposed to be about correlations. If you are familiar with JMS, correlation is probably nothing new. Basically you are associating one message with another message via some identifier. In JMS you typically use the correlationID message header attribute and create the appropriate logic in your application. CAPS (and Cape Clear, as well as others) provides a correlation mechanism within a Business Process (using BPEL). BPEL correlation allows a process to be persisted while waiting for a correlating response. This is usually required when dealing with asynchronous exchanges.

A typical example is a purchase order and corresponding invoice (related via the PO number). A business process may receive a PO, do some processing, and then wait for the associated invoice. Once the invoice is received, the business process does some more processing using the PO and Invoice data, and eventually completes. While waiting for the Invoice, the process can be persisted either to memory or some more permanent store to survive server reboots, etc. The magic here is retrieving the correct business process instance once the corresponding invoice is received. This is accomplished via the correlation set. A correlation set is defined different ways for different products, but it essentially represents some combination of message parts that are available to activities within the business process. So in this example, the PO number in the purchase order message and the PO number in the invoice message would comprise this correlation set. These message parts are typically called aliases.

Within CAPS, you define what is called a correlation key, which is made up of the aliases described above. Then you define a correlation set based off of the correlation key. I'm not really sure why the extra step of creating a correlation key, but that is the mechanics involved. As shown below, this correlation key/correlation set is defined on the properties sheet of the business process.


In this example, the correlation key is made up of a single alias, the message correlation ID. In reality, the alias can be made up of one or more message parts that exist in the business process.


Once defined, the correlation set needs to be applied to activities within the business process. One activity should initialize the correlation set, while another process should use the correlation set.


The difference being the value for the "Initialize Set" field. Also, depending on the type of activity for which you are using correlation sets, you may get the following dialog:


On this dialog, you need to select where the application of the correlation set should take place, at the input of this activity, the output, or both. I'm not really sure what it means to use "both", but I hope to experiment with this sometime in the future.

In order to have the business process use the correlations, there must be a receive activity somewhere in the midst of the business where we are waiting for a message. So in the typical example above, we may be waiting for the invoice to arrive on a message queue somewhere after the PO has arrived in our business process.

So here is how the process works, using a simple business process shown below:

A PO comes in via JMS (PO number 123). The business process is instantiated. This receive activity is configured to initialize a correlation set as shown previously. Execution of the various activities and business rules will continue until the next receive activity is encountered (JMS.receive Invoice). At this point the business process is persisted using the identifier specified in the correlation set. In our example this would be the correlation message header property of the inbound JMS message (123). At some future time, an invoice shows up on the appropriate message queue for PO 123. This invoice has the correlation Id of the JMS message header set to the purchase order, so the BPEL engine looks for a business process instance with a correlation identifier which includes the name of the correlation set (corrSetCorrelationID) and a value of 123. If this is found, the business process instance is retrieved (with its state intact) and the process continues to completion.

This is a real simple example which uses the JMS message header as the correlation set alias. This requires the applications creating the messages to populate this field prior to sending the message. A more elegant solution is to use actual message values to correlate on, but the goal of this post was to introduce the basics of correlation.

You can find tutorials that show how easy this is to do on many of the SOA product suites. Tutorials are great, but they hardly translate to real world problems. Here is where my real world "challenge" came into play. The CAPS tutorials that I found really didn't explain the entire correlation configuration and mechanics. In my next post, I'll discuss the challenges I encountered applying correlation to my business process (described here) and the final implementation. Although this will present correlation relative to CAPS, I believe this will translate well into other product implementations of correlation.

Wednesday, March 14, 2007

First Step: Receive a JMS Message

This blog continues the story from my previous post (The Virtual Enterprise). In this installment I will describe what I had to go through in order for my business process to receive an XML message from a JMS queue (using ActiveMQ as the provider). Recall that I am implementing my business process using Cape Clear.

As a point of clarification, I have yet to receive any formal training in Cape Clear's product and am attempting to implement my business process by referring to documentation and tutorials available on Cape Clear's developer web site (http://developer.capeclear.com/). To Cape Clear's credit, they do have a good amount of documentation available to the public. Just to make it a little tougher, I chose to use Version 7 of their product, which (at the time of my effort) is in beta (some may call me a glutton for punishment).

In Cape Clear's world, everything is a web service communicating using SOAP. Also, as for supported JMS providers, ActiveMQ is not among them. They do support (out of the box) IBM, BEA, and JBoss. As an additional piece of information, my Cape Clear installation is deployed on JBoss (other options include BEA, IBM, Tomcat).

So, here is problem number 1 (or is that challenge number 1). How do I get a JMS XML message (non-SOAP) sitting on an ActiveMQ message queue to kick off a business process (BPEL) deployed to Cape Clear?

Just to be specific, there are actually 2 problems here. First, using ActiveMQ as the JMS provider, and second, handling a non-SOAP message. Looking back (after a little teeth grinding and mental cursing), the solution was "easy".

First, I configured the JBoss server to access an ActiveMQ broker running on another machine. This part was fairly straightforward using the ActiveMQ resource adapter. I followed directions that I found here (modified for my specific versions of the products and to use a remote broker in place of the embedded broker). Once I got this working, the next step was to configure the Cape Clear server to route messages on this ActiveMQ queue to my new BPEL process. This was accomplished by configuring a "transport" on the Cape Clear server. I essentially created a JMS JBossMQ transport, but in place of the standard connection factories and queue names, I used the JNDI names for the ActiveMQ connection factories and queue names that I configured with the ActiveMQ resource adapter. And then, like magic, I was able to receive messages from an ActiveMQ queue.

Now, how do I get the BPEL process to handle a non-SOAP message. Fortunately, as I stated earlier, Cape Clear has a good amount of documentation and tutorials on their site. I was able to find an example of handling non-SOAP XML messages in an Orchestration Web Service (i.e. a BPEL process deployed as a web service). This tutorial was created for version 6 of their product, but works as well on version 7. Essentially what happens is that you define an XSLT transformation to wrap a non-SOAP message in the appropriate SOAP header and body tags to make it a SOAP message. This XSLT is applied prior to invoking the BPEL process by configuring an interceptor for the BPEL process deployed within the Cape Clear server. How much this additional step potentially impacts performance is TBD and will be addressed sometime later in my road to discovery :). I wonder why Cape Clear just doesn't make that a configuration option for a BPEL web service and then perform some sort of optimized operation to receive the non-SOAP message, or skip the SOAP message parsing entirely and just pass the message directly to the BPEL?

Anyway.....there you have it. I can now accept a non-SOAP message from an ActiveMQ queue and invoke my BPEL process deployed to Cape Clear. So simple a monkey could do it ;) ......got any bananas?